mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
Compare commits
6 Commits
cc2a2380b9
...
e26f90c514
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e26f90c514 | ||
|
|
f9a38a26b2 | ||
|
|
d32a6fbdd4 | ||
|
|
e954009439 | ||
|
|
59507711e8 | ||
|
|
86c1165c12 |
1673
poetry.lock
generated
1673
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -14,21 +14,21 @@ priority = "primary"
|
|||||||
[tool.poetry.dependencies]
|
[tool.poetry.dependencies]
|
||||||
python = "^3.10"
|
python = "^3.10"
|
||||||
playwright = "^1.41.1"
|
playwright = "^1.41.1"
|
||||||
nonebot-adapter-onebot = "^2.3.1"
|
nonebot-adapter-onebot = ">=2.3.1"
|
||||||
nonebot-plugin-apscheduler = "^0.5"
|
nonebot-plugin-apscheduler = "^0.5"
|
||||||
tortoise-orm = "^0.20.0"
|
tortoise-orm = "^0.20.0"
|
||||||
cattrs = "^23.2.3"
|
cattrs = "^23.2.3"
|
||||||
ruamel-yaml = "^0.18.5"
|
ruamel-yaml = "^0.18.5"
|
||||||
strenum = "^0.4.15"
|
strenum = "^0.4.15"
|
||||||
nonebot-plugin-session = "^0.2.3"
|
nonebot-plugin-session = "^0.3.2"
|
||||||
ujson = "^5.9.0"
|
ujson = ">=5.9.0"
|
||||||
nb-cli = "^1.3.0"
|
nb-cli = ">=1.3.0"
|
||||||
nonebot2 = { extras = ["fastapi"], version = "^2.3.3" }
|
nonebot2 = { extras = ["fastapi"], version = ">=2.3.3" }
|
||||||
pillow = "^10.0.0"
|
pillow = ">=10.0.0"
|
||||||
retrying = "^1.3.4"
|
retrying = "^1.3.4"
|
||||||
aiofiles = "^23.2.1"
|
aiofiles = "^23.2.1"
|
||||||
nonebot-plugin-htmlrender = ">=0.6.0,<1.0.0"
|
nonebot-plugin-htmlrender = ">=0.6.0,<1.0.0"
|
||||||
pypinyin = "^0.51.0"
|
pypinyin = ">=0.51.0"
|
||||||
beautifulsoup4 = "^4.12.3"
|
beautifulsoup4 = "^4.12.3"
|
||||||
lxml = "^5.1.0"
|
lxml = "^5.1.0"
|
||||||
psutil = "^5.9.8"
|
psutil = "^5.9.8"
|
||||||
@ -36,14 +36,14 @@ feedparser = "^6.0.11"
|
|||||||
imagehash = "^4.3.1"
|
imagehash = "^4.3.1"
|
||||||
cn2an = "^0.5.22"
|
cn2an = "^0.5.22"
|
||||||
dateparser = "^1.2.0"
|
dateparser = "^1.2.0"
|
||||||
bilireq = "0.2.3post0"
|
bilireq = ">=0.2.10"
|
||||||
python-jose = { extras = ["cryptography"], version = "^3.3.0" }
|
python-jose = { extras = ["cryptography"], version = "^3.3.0" }
|
||||||
python-multipart = "^0.0.9"
|
python-multipart = "^0.0.9"
|
||||||
aiocache = {extras = ["redis"], version = "^0.12.3"}
|
aiocache = {extras = ["redis"], version = "^0.12.3"}
|
||||||
py-cpuinfo = "^9.0.0"
|
py-cpuinfo = "^9.0.0"
|
||||||
nonebot-plugin-alconna = "^0.54.0"
|
nonebot-plugin-alconna = ">=0.56.0"
|
||||||
tenacity = "^9.0.0"
|
tenacity = "^9.0.0"
|
||||||
nonebot-plugin-uninfo = ">0.4.1"
|
nonebot-plugin-uninfo = ">=0.7.3"
|
||||||
nonebot-plugin-waiter = "^0.8.1"
|
nonebot-plugin-waiter = "^0.8.1"
|
||||||
multidict = ">=6.0.0,!=6.3.2"
|
multidict = ">=6.0.0,!=6.3.2"
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import random
|
import random
|
||||||
import time
|
|
||||||
|
|
||||||
from nonebot import on_message, on_request
|
from nonebot import on_message, on_request
|
||||||
from nonebot.adapters.onebot.v11 import (
|
from nonebot.adapters.onebot.v11 import (
|
||||||
@ -12,7 +11,6 @@ from nonebot.adapters.onebot.v11 import (
|
|||||||
from nonebot.adapters.onebot.v11 import Bot as v11Bot
|
from nonebot.adapters.onebot.v11 import Bot as v11Bot
|
||||||
from nonebot.adapters.onebot.v12 import Bot as v12Bot
|
from nonebot.adapters.onebot.v12 import Bot as v12Bot
|
||||||
from nonebot.plugin import PluginMetadata
|
from nonebot.plugin import PluginMetadata
|
||||||
from nonebot_plugin_apscheduler import scheduler
|
|
||||||
from nonebot_plugin_session import EventSession
|
from nonebot_plugin_session import EventSession
|
||||||
|
|
||||||
from zhenxun.configs.config import BotConfig, Config
|
from zhenxun.configs.config import BotConfig, Config
|
||||||
@ -66,19 +64,6 @@ __plugin_meta__ = PluginMetadata(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class Timer:
|
|
||||||
data: dict[str, float] = {} # noqa: RUF012
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def check(cls, uid: int | str):
|
|
||||||
return True if uid not in cls.data else time.time() - cls.data[uid] > 5 * 60
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def clear(cls):
|
|
||||||
now = time.time()
|
|
||||||
cls.data = {k: v for k, v in cls.data.items() if v - now < 5 * 60}
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: 其他平台请求
|
# TODO: 其他平台请求
|
||||||
|
|
||||||
friend_req = on_request(priority=5, block=True)
|
friend_req = on_request(priority=5, block=True)
|
||||||
@ -86,68 +71,70 @@ group_req = on_request(priority=5, block=True)
|
|||||||
_t = on_message(priority=999, block=False, rule=lambda: False)
|
_t = on_message(priority=999, block=False, rule=lambda: False)
|
||||||
|
|
||||||
|
|
||||||
cache = CacheRoot.cache_dict(
|
cache = CacheRoot.cache_dict("REQUEST_CACHE", 60, str)
|
||||||
"REQUEST_CACHE", (base_config.get("TIP_MESSAGE_LIMIT") or 360) * 60, str
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@friend_req.handle()
|
@friend_req.handle()
|
||||||
async def _(bot: v12Bot | v11Bot, event: FriendRequestEvent, session: EventSession):
|
async def _(bot: v12Bot | v11Bot, event: FriendRequestEvent, session: EventSession):
|
||||||
if event.user_id and Timer.check(event.user_id):
|
logger.debug("收录好友请求...", "好友请求", target=event.user_id)
|
||||||
logger.debug("收录好友请求...", "好友请求", target=event.user_id)
|
user = await bot.get_stranger_info(user_id=event.user_id)
|
||||||
user = await bot.get_stranger_info(user_id=event.user_id)
|
nickname = user["nickname"]
|
||||||
nickname = user["nickname"]
|
# sex = user["sex"]
|
||||||
# sex = user["sex"]
|
# age = str(user["age"])
|
||||||
# age = str(user["age"])
|
comment = event.comment
|
||||||
comment = event.comment
|
if base_config.get("AUTO_ADD_FRIEND"):
|
||||||
if base_config.get("AUTO_ADD_FRIEND"):
|
logger.debug(
|
||||||
logger.debug(
|
"已开启好友请求自动同意,成功通过该请求",
|
||||||
"已开启好友请求自动同意,成功通过该请求",
|
"好友请求",
|
||||||
"好友请求",
|
target=event.user_id,
|
||||||
target=event.user_id,
|
)
|
||||||
)
|
await asyncio.sleep(random.randint(1, 10))
|
||||||
await asyncio.sleep(random.randint(1, 10))
|
await bot.set_friend_add_request(flag=event.flag, approve=True)
|
||||||
await bot.set_friend_add_request(flag=event.flag, approve=True)
|
await FriendUser.create(
|
||||||
await FriendUser.create(
|
user_id=str(user["user_id"]), user_name=user["nickname"]
|
||||||
user_id=str(user["user_id"]), user_name=user["nickname"]
|
)
|
||||||
)
|
|
||||||
else:
|
|
||||||
# 旧请求全部设置为过期
|
|
||||||
await FgRequest.filter(
|
|
||||||
request_type=RequestType.FRIEND,
|
|
||||||
user_id=str(event.user_id),
|
|
||||||
handle_type__isnull=True,
|
|
||||||
).update(handle_type=RequestHandleType.EXPIRE)
|
|
||||||
f = await FgRequest.create(
|
|
||||||
request_type=RequestType.FRIEND,
|
|
||||||
platform=session.platform,
|
|
||||||
bot_id=bot.self_id,
|
|
||||||
flag=event.flag,
|
|
||||||
user_id=event.user_id,
|
|
||||||
nickname=nickname,
|
|
||||||
comment=comment,
|
|
||||||
)
|
|
||||||
cache_key = str(event.user_id)
|
|
||||||
if not cache.get(cache_key):
|
|
||||||
cache.set(cache_key, "1")
|
|
||||||
results = await PlatformUtils.send_superuser(
|
|
||||||
bot,
|
|
||||||
f"*****一份好友申请*****\n"
|
|
||||||
f"ID: {f.id}\n"
|
|
||||||
f"昵称:{nickname}({event.user_id})\n"
|
|
||||||
f"自动同意:{'√' if base_config.get('AUTO_ADD_FRIEND') else '×'}\n"
|
|
||||||
f"日期:{datetime.now().replace(microsecond=0)}\n"
|
|
||||||
f"备注:{event.comment}",
|
|
||||||
)
|
|
||||||
if message_ids := [
|
|
||||||
str(r[1].msg_ids[0]["message_id"])
|
|
||||||
for r in results
|
|
||||||
if r[1] and r[1].msg_ids
|
|
||||||
]:
|
|
||||||
f.message_ids = ",".join(message_ids)
|
|
||||||
await f.save(update_fields=["message_ids"])
|
|
||||||
else:
|
else:
|
||||||
logger.debug("好友请求五分钟内重复, 已忽略", "好友请求", target=event.user_id)
|
# 旧请求全部设置为过期
|
||||||
|
await FgRequest.filter(
|
||||||
|
request_type=RequestType.FRIEND,
|
||||||
|
user_id=str(event.user_id),
|
||||||
|
handle_type__isnull=True,
|
||||||
|
).update(handle_type=RequestHandleType.EXPIRE)
|
||||||
|
f = await FgRequest.create(
|
||||||
|
request_type=RequestType.FRIEND,
|
||||||
|
platform=session.platform,
|
||||||
|
bot_id=bot.self_id,
|
||||||
|
flag=event.flag,
|
||||||
|
user_id=event.user_id,
|
||||||
|
nickname=nickname,
|
||||||
|
comment=comment,
|
||||||
|
)
|
||||||
|
cache_key = str(event.user_id)
|
||||||
|
if not cache.get(cache_key):
|
||||||
|
cache.set(cache_key, "1")
|
||||||
|
results = await PlatformUtils.send_superuser(
|
||||||
|
bot,
|
||||||
|
f"*****一份好友申请*****\n"
|
||||||
|
f"ID: {f.id}\n"
|
||||||
|
f"昵称:{nickname}({event.user_id})\n"
|
||||||
|
f"自动同意:{'√' if base_config.get('AUTO_ADD_FRIEND') else '×'}\n"
|
||||||
|
f"日期:{datetime.now().replace(microsecond=0)}\n"
|
||||||
|
f"备注:{event.comment}",
|
||||||
|
)
|
||||||
|
if message_ids := [
|
||||||
|
str(r[1].msg_ids[0]["message_id"])
|
||||||
|
for r in results
|
||||||
|
if r[1] and r[1].msg_ids
|
||||||
|
]:
|
||||||
|
f.message_ids = ",".join(message_ids)
|
||||||
|
await f.save(update_fields=["message_ids"])
|
||||||
|
else:
|
||||||
|
tip_limit = base_config.get("TIP_MESSAGE_LIMIT") or 360
|
||||||
|
logger.debug(
|
||||||
|
f"好友请求{tip_limit}分钟内重复, 已忽略",
|
||||||
|
"好友请求",
|
||||||
|
target=cache_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@group_req.handle()
|
@group_req.handle()
|
||||||
@ -227,7 +214,7 @@ async def _(bot: v12Bot | v11Bot, event: GroupRequestEvent, session: EventSessio
|
|||||||
"\n在群组中 群组管理员与群主 允许使用管理员帮助"
|
"\n在群组中 群组管理员与群主 允许使用管理员帮助"
|
||||||
"(包括ban与功能开关等)\n请在群组中发送 '管理员帮助'",
|
"(包括ban与功能开关等)\n请在群组中发送 '管理员帮助'",
|
||||||
)
|
)
|
||||||
elif cache.get(f"{event.group_id}"):
|
elif not cache.get(f"{event.group_id}"):
|
||||||
cache.set(f"{event.group_id}", "1")
|
cache.set(f"{event.group_id}", "1")
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"收录 用户[{event.user_id}] 群聊[{event.group_id}] 群聊请求",
|
f"收录 用户[{event.user_id}] 群聊[{event.group_id}] 群聊请求",
|
||||||
@ -284,15 +271,3 @@ async def _(bot: v12Bot | v11Bot, event: GroupRequestEvent, session: EventSessio
|
|||||||
"群聊请求",
|
"群聊请求",
|
||||||
target=f"{event.user_id}:{event.group_id}",
|
target=f"{event.user_id}:{event.group_id}",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@scheduler.scheduled_job(
|
|
||||||
"interval",
|
|
||||||
minutes=5,
|
|
||||||
)
|
|
||||||
async def _():
|
|
||||||
Timer.clear()
|
|
||||||
|
|
||||||
|
|
||||||
async def _():
|
|
||||||
Timer.clear()
|
|
||||||
|
|||||||
7
zhenxun/services/cache/cache_containers.py
vendored
7
zhenxun/services/cache/cache_containers.py
vendored
@ -82,12 +82,7 @@ class CacheDict(Generic[T]):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
# 检查是否过期
|
# 检查是否过期
|
||||||
data = self._data[key]
|
return bool(self.expire_time(key))
|
||||||
if data.expire_time > 0 and data.expire_time < time.time():
|
|
||||||
del self._data[key]
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def get(self, key: str, default: Any = None) -> T | None:
|
def get(self, key: str, default: Any = None) -> T | None:
|
||||||
"""获取字典项,如果不存在返回默认值
|
"""获取字典项,如果不存在返回默认值
|
||||||
|
|||||||
@ -42,8 +42,8 @@ class BuildImage:
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
width: int = 0,
|
width: float = 0,
|
||||||
height: int = 0,
|
height: float = 0,
|
||||||
color: ColorAlias = (255, 255, 255),
|
color: ColorAlias = (255, 255, 255),
|
||||||
mode: ModeType = "RGBA",
|
mode: ModeType = "RGBA",
|
||||||
font: str | Path | FreeTypeFont = "HYWenHei-85W.ttf",
|
font: str | Path | FreeTypeFont = "HYWenHei-85W.ttf",
|
||||||
@ -63,12 +63,14 @@ class BuildImage:
|
|||||||
else:
|
else:
|
||||||
self.markImg = Image.open(background)
|
self.markImg = Image.open(background)
|
||||||
if width and height:
|
if width and height:
|
||||||
self.markImg = self.markImg.resize((width, height), Resampling.LANCZOS)
|
self.markImg = self.markImg.resize(
|
||||||
|
(int(width), int(height)), Resampling.LANCZOS
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
self.width = self.markImg.width
|
self.width = self.markImg.width
|
||||||
self.height = self.markImg.height
|
self.height = self.markImg.height
|
||||||
elif width and height:
|
elif width and height:
|
||||||
self.markImg = Image.new(mode, (width, height), color) # type: ignore
|
self.markImg = Image.new(mode, (int(width), int(height)), color)
|
||||||
else:
|
else:
|
||||||
raise ValueError("长度和宽度不能为空...")
|
raise ValueError("长度和宽度不能为空...")
|
||||||
self.draw = ImageDraw.Draw(self.markImg)
|
self.draw = ImageDraw.Draw(self.markImg)
|
||||||
@ -222,7 +224,7 @@ class BuildImage:
|
|||||||
text: str,
|
text: str,
|
||||||
font: str | FreeTypeFont | None = "HYWenHei-85W.ttf",
|
font: str | FreeTypeFont | None = "HYWenHei-85W.ttf",
|
||||||
font_size: int = 10,
|
font_size: int = 10,
|
||||||
) -> tuple[int, int]: # sourcery skip: remove-unnecessary-cast
|
) -> tuple[float, float]:
|
||||||
"""获取该字体下文本需要的长宽
|
"""获取该字体下文本需要的长宽
|
||||||
|
|
||||||
参数:
|
参数:
|
||||||
@ -231,20 +233,20 @@ class BuildImage:
|
|||||||
font_size: 字体大小
|
font_size: 字体大小
|
||||||
|
|
||||||
返回:
|
返回:
|
||||||
tuple[int, int]: 长宽
|
tuple[float, float]: 长宽
|
||||||
"""
|
"""
|
||||||
_font = font
|
_font = font
|
||||||
if font and type(font) is str:
|
if font and type(font) is str:
|
||||||
_font = cls.load_font(font, font_size)
|
_font = cls.load_font(font, font_size)
|
||||||
temp_image = Image.new("RGB", (1, 1), (255, 255, 255))
|
temp_image = Image.new("RGB", (1, 1), (255, 255, 255))
|
||||||
draw = ImageDraw.Draw(temp_image)
|
draw = ImageDraw.Draw(temp_image)
|
||||||
text_box = draw.textbbox((0, 0), str(text), font=_font) # type: ignore
|
text_box = draw.textbbox((0, 0), text, font=_font) # pyright: ignore[reportArgumentType]
|
||||||
text_width = text_box[2] - text_box[0]
|
text_width = text_box[2] - text_box[0]
|
||||||
text_height = text_box[3] - text_box[1]
|
text_height = text_box[3] - text_box[1]
|
||||||
return text_width, text_height + 10
|
return text_width, text_height + 10
|
||||||
# return _font.getsize(str(text)) # type: ignore
|
# return _font.getsize(str(text)) # type: ignore
|
||||||
|
|
||||||
def getsize(self, msg: str) -> tuple[int, int]:
|
def getsize(self, msg: str) -> tuple[float, float]:
|
||||||
# sourcery skip: remove-unnecessary-cast
|
# sourcery skip: remove-unnecessary-cast
|
||||||
"""
|
"""
|
||||||
获取文字在该图片 font_size 下所需要的空间
|
获取文字在该图片 font_size 下所需要的空间
|
||||||
@ -253,7 +255,7 @@ class BuildImage:
|
|||||||
msg: 文本
|
msg: 文本
|
||||||
|
|
||||||
返回:
|
返回:
|
||||||
tuple[int, int]: 长宽
|
tuple[float, float]: 长宽
|
||||||
"""
|
"""
|
||||||
temp_image = Image.new("RGB", (1, 1), (255, 255, 255))
|
temp_image = Image.new("RGB", (1, 1), (255, 255, 255))
|
||||||
draw = ImageDraw.Draw(temp_image)
|
draw = ImageDraw.Draw(temp_image)
|
||||||
@ -265,9 +267,9 @@ class BuildImage:
|
|||||||
|
|
||||||
def __center_xy(
|
def __center_xy(
|
||||||
self,
|
self,
|
||||||
pos: tuple[int, int],
|
pos: tuple[float, float],
|
||||||
width: int,
|
width: float,
|
||||||
height: int,
|
height: float,
|
||||||
center_type: CenterType | None,
|
center_type: CenterType | None,
|
||||||
) -> tuple[int, int]:
|
) -> tuple[int, int]:
|
||||||
"""
|
"""
|
||||||
@ -284,21 +286,21 @@ class BuildImage:
|
|||||||
# _width, _height = pos
|
# _width, _height = pos
|
||||||
if self.width and self.height:
|
if self.width and self.height:
|
||||||
if center_type == "center":
|
if center_type == "center":
|
||||||
width = int((self.width - width) / 2)
|
width = (self.width - width) / 2
|
||||||
height = int((self.height - height) / 2)
|
height = (self.height - height) / 2
|
||||||
elif center_type == "width":
|
elif center_type == "width":
|
||||||
width = int((self.width - width) / 2)
|
width = (self.width - width) / 2
|
||||||
height = pos[1]
|
height = pos[1]
|
||||||
elif center_type == "height":
|
elif center_type == "height":
|
||||||
width = pos[0]
|
width = pos[0]
|
||||||
height = int((self.height - height) / 2)
|
height = (self.height - height) / 2
|
||||||
return width, height
|
return int(width), int(height)
|
||||||
|
|
||||||
@run_sync
|
@run_sync
|
||||||
def paste(
|
def paste(
|
||||||
self,
|
self,
|
||||||
image: Self | tImage,
|
image: Self | tImage,
|
||||||
pos: tuple[int, int] = (0, 0),
|
pos: tuple[float, float] = (0, 0),
|
||||||
center_type: CenterType | None = None,
|
center_type: CenterType | None = None,
|
||||||
) -> Self:
|
) -> Self:
|
||||||
"""贴图
|
"""贴图
|
||||||
@ -370,7 +372,7 @@ class BuildImage:
|
|||||||
@run_sync
|
@run_sync
|
||||||
def text(
|
def text(
|
||||||
self,
|
self,
|
||||||
pos: tuple[int, int],
|
pos: tuple[float, float],
|
||||||
text: str,
|
text: str,
|
||||||
fill: str | tuple[int, int, int] = (0, 0, 0),
|
fill: str | tuple[int, int, int] = (0, 0, 0),
|
||||||
center_type: CenterType | None = None,
|
center_type: CenterType | None = None,
|
||||||
@ -430,7 +432,7 @@ class BuildImage:
|
|||||||
self.markImg.show()
|
self.markImg.show()
|
||||||
|
|
||||||
@run_sync
|
@run_sync
|
||||||
def resize(self, ratio: float = 0, width: int = 0, height: int = 0) -> Self:
|
def resize(self, ratio: float = 0, width: float = 0, height: float = 0) -> Self:
|
||||||
"""
|
"""
|
||||||
压缩图片
|
压缩图片
|
||||||
|
|
||||||
@ -451,13 +453,13 @@ class BuildImage:
|
|||||||
if not width and not height:
|
if not width and not height:
|
||||||
width = int(self.width * ratio)
|
width = int(self.width * ratio)
|
||||||
height = int(self.height * ratio)
|
height = int(self.height * ratio)
|
||||||
self.markImg = self.markImg.resize((width, height), Image.LANCZOS) # type: ignore
|
self.markImg = self.markImg.resize((int(width), int(height)), Image.LANCZOS) # type: ignore
|
||||||
self.width, self.height = self.markImg.size
|
self.width, self.height = self.markImg.size
|
||||||
self.draw = ImageDraw.Draw(self.markImg)
|
self.draw = ImageDraw.Draw(self.markImg)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@run_sync
|
@run_sync
|
||||||
def crop(self, box: tuple[int, int, int, int]) -> Self:
|
def crop(self, box: tuple[float, float, float, float]) -> Self:
|
||||||
"""
|
"""
|
||||||
裁剪图片
|
裁剪图片
|
||||||
|
|
||||||
@ -580,7 +582,7 @@ class BuildImage:
|
|||||||
@run_sync
|
@run_sync
|
||||||
def line(
|
def line(
|
||||||
self,
|
self,
|
||||||
xy: tuple[int, int, int, int],
|
xy: tuple[float, float, float, float],
|
||||||
fill: tuple[int, int, int] | str = "#D8DEE4",
|
fill: tuple[int, int, int] | str = "#D8DEE4",
|
||||||
width: int = 1,
|
width: int = 1,
|
||||||
) -> Self:
|
) -> Self:
|
||||||
|
|||||||
@ -321,8 +321,7 @@ class BuildMat:
|
|||||||
if not self.build_data.y_index:
|
if not self.build_data.y_index:
|
||||||
"""没有指定y_index时,使用data自动生成"""
|
"""没有指定y_index时,使用data自动生成"""
|
||||||
max_num = max(self.build_data.data)
|
max_num = max(self.build_data.data)
|
||||||
if max_num < 5:
|
max_num = max(max_num, 5)
|
||||||
max_num = 5
|
|
||||||
s = int(max_num / 5)
|
s = int(max_num / 5)
|
||||||
_y_index = [max_num]
|
_y_index = [max_num]
|
||||||
for _n in range(4):
|
for _n in range(4):
|
||||||
@ -334,23 +333,20 @@ class BuildMat:
|
|||||||
# _tmp = ["_" for _ in range(len(_y_index) - 1)]
|
# _tmp = ["_" for _ in range(len(_y_index) - 1)]
|
||||||
# _tmp.append(str(_y_index[0]))
|
# _tmp.append(str(_y_index[0]))
|
||||||
# _y_index = _tmp
|
# _y_index = _tmp
|
||||||
self.build_data.y_index = _y_index # type: ignore
|
self.build_data.y_index = _y_index
|
||||||
for item in self.build_data.y_index:
|
for item in self.build_data.y_index:
|
||||||
text_size = BuildImage.get_text_size(str(item), font)
|
text_size = BuildImage.get_text_size(str(item), font)
|
||||||
if text_size[0] > padding_width:
|
if text_size[0] > padding_width:
|
||||||
padding_width = text_size[0]
|
padding_width = text_size[0]
|
||||||
y_height_list.append(text_size)
|
y_height_list.append(text_size)
|
||||||
if self.build_data.mat_type == MatType.BARH:
|
if self.build_data.mat_type == MatType.BARH:
|
||||||
_tmp = x_width_list
|
x_width_list, y_height_list = y_height_list, x_width_list
|
||||||
x_width_list = y_height_list
|
|
||||||
y_height_list = _tmp
|
|
||||||
old_space = self.build_data.space
|
old_space = self.build_data.space
|
||||||
width = padding_width * 2 + self.build_data.space[0] * 2 + 20
|
width = padding_width * 2 + self.build_data.space[0] * 2 + 20
|
||||||
height = (
|
height = (
|
||||||
sum([h[1] + self.build_data.space[1] for h in y_height_list])
|
sum(h[1] + self.build_data.space[1] for h in y_height_list)
|
||||||
+ self.build_data.space[1] * 2
|
+ self.build_data.space[1] * 2
|
||||||
+ 30
|
) + 30
|
||||||
)
|
|
||||||
_x_index = self.build_data.x_index
|
_x_index = self.build_data.x_index
|
||||||
_y_index = self.build_data.y_index
|
_y_index = self.build_data.y_index
|
||||||
_barh_max_text_width = 0
|
_barh_max_text_width = 0
|
||||||
@ -376,7 +372,7 @@ class BuildMat:
|
|||||||
width += self.build_data.space[0] * (len(_x_index) - 1)
|
width += self.build_data.space[0] * (len(_x_index) - 1)
|
||||||
else:
|
else:
|
||||||
"""非横向柱状图时加字体宽度"""
|
"""非横向柱状图时加字体宽度"""
|
||||||
width += sum([w[0] + self.build_data.space[0] for w in x_width_list])
|
width += sum(w[0] + self.build_data.space[0] for w in x_width_list)
|
||||||
|
|
||||||
A = BuildImage(
|
A = BuildImage(
|
||||||
width + 5,
|
width + 5,
|
||||||
|
|||||||
@ -229,8 +229,8 @@ class ImageTemplate:
|
|||||||
async def __build_text_image(
|
async def __build_text_image(
|
||||||
cls,
|
cls,
|
||||||
text: str,
|
text: str,
|
||||||
width: int,
|
width: float,
|
||||||
height: int,
|
height: float,
|
||||||
font: FreeTypeFont,
|
font: FreeTypeFont,
|
||||||
font_color: str | tuple[int, int, int] = (0, 0, 0),
|
font_color: str | tuple[int, int, int] = (0, 0, 0),
|
||||||
color: str | tuple[int, int, int] = (255, 255, 255),
|
color: str | tuple[int, int, int] = (255, 255, 255),
|
||||||
|
|||||||
@ -98,7 +98,7 @@ def get_client() -> AsyncClient:
|
|||||||
|
|
||||||
|
|
||||||
def get_async_client(
|
def get_async_client(
|
||||||
proxies: dict[str, str] | None = None,
|
proxies: dict[str, str] | str | None = None,
|
||||||
proxy: str | None = None,
|
proxy: str | None = None,
|
||||||
verify: bool = False,
|
verify: bool = False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
@ -109,6 +109,8 @@ def get_async_client(
|
|||||||
"""
|
"""
|
||||||
transport = kwargs.pop("transport", None) or AsyncHTTPTransport(verify=verify)
|
transport = kwargs.pop("transport", None) or AsyncHTTPTransport(verify=verify)
|
||||||
if proxies:
|
if proxies:
|
||||||
|
if isinstance(proxies, str):
|
||||||
|
proxies = {"http://": proxies, "https://": proxies}
|
||||||
http_proxy = proxies.get("http://")
|
http_proxy = proxies.get("http://")
|
||||||
https_proxy = proxies.get("https://")
|
https_proxy = proxies.get("https://")
|
||||||
return httpx.AsyncClient(
|
return httpx.AsyncClient(
|
||||||
|
|||||||
@ -65,13 +65,13 @@ async def text2image(
|
|||||||
top_padding = padding[0]
|
top_padding = padding[0]
|
||||||
left_padding = padding[1]
|
left_padding = padding[1]
|
||||||
_font = BuildImage.load_font(font, font_size)
|
_font = BuildImage.load_font(font, font_size)
|
||||||
|
image_list = []
|
||||||
if auto_parse and re.search(r"<f(.*)>(.*)</f>", text):
|
if auto_parse and re.search(r"<f(.*)>(.*)</f>", text):
|
||||||
_data = []
|
_data = []
|
||||||
new_text = ""
|
new_text = ""
|
||||||
placeholder_index = 0
|
placeholder_index = 0
|
||||||
for s in text.split("</f>"):
|
for s in text.split("</f>"):
|
||||||
r = re.search(r"<f(.*)>(.*)", s)
|
if r := re.search(r"<f(.*)>(.*)", s):
|
||||||
if r:
|
|
||||||
start, end = r.span()
|
start, end = r.span()
|
||||||
if start != 0 and (t := s[:start]):
|
if start != 0 and (t := s[:start]):
|
||||||
new_text += t
|
new_text += t
|
||||||
@ -79,14 +79,13 @@ async def text2image(
|
|||||||
[
|
[
|
||||||
(start, end),
|
(start, end),
|
||||||
f"[placeholder_{placeholder_index}]",
|
f"[placeholder_{placeholder_index}]",
|
||||||
r.group(1).strip(),
|
r[1].strip(),
|
||||||
r.group(2),
|
r[2],
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
new_text += f"[placeholder_{placeholder_index}]"
|
new_text += f"[placeholder_{placeholder_index}]"
|
||||||
placeholder_index += 1
|
placeholder_index += 1
|
||||||
new_text += text.split("</f>")[-1]
|
new_text += text.split("</f>")[-1]
|
||||||
image_list = []
|
|
||||||
current_placeholder_index = 0
|
current_placeholder_index = 0
|
||||||
# 切分换行,每行为单张图片
|
# 切分换行,每行为单张图片
|
||||||
for s in new_text.split("\n"):
|
for s in new_text.split("\n"):
|
||||||
@ -97,12 +96,9 @@ async def text2image(
|
|||||||
for _ in range(s.count("[placeholder_")):
|
for _ in range(s.count("[placeholder_")):
|
||||||
placeholder = _data[_tmp_index]
|
placeholder = _data[_tmp_index]
|
||||||
if "font_size" in placeholder[2]:
|
if "font_size" in placeholder[2]:
|
||||||
r = re.search(r"font_size=['\"]?(\d+)", placeholder[2])
|
if r := re.search(r"font_size=['\"]?(\d+)", placeholder[2]):
|
||||||
if r:
|
w, h = BuildImage.get_text_size(placeholder[3], font, int(r[1]))
|
||||||
w, h = BuildImage.get_text_size(
|
img_height = max(img_height, h)
|
||||||
placeholder[3], font, int(r.group(1))
|
|
||||||
)
|
|
||||||
img_height = img_height if img_height > h else h
|
|
||||||
img_width += w
|
img_width += w
|
||||||
else:
|
else:
|
||||||
img_width += BuildImage.get_text_size(placeholder[3], _font)[0]
|
img_width += BuildImage.get_text_size(placeholder[3], _font)[0]
|
||||||
@ -135,10 +131,8 @@ async def text2image(
|
|||||||
_font = e.split("=")[-1]
|
_font = e.split("=")[-1]
|
||||||
if e.startswith("font_size=") or e.startswith("fs="):
|
if e.startswith("font_size=") or e.startswith("fs="):
|
||||||
_font_size = int(e.split("=")[-1])
|
_font_size = int(e.split("=")[-1])
|
||||||
if _font_size > 1000:
|
_font_size = min(_font_size, 1000)
|
||||||
_font_size = 1000
|
_font_size = max(_font_size, 1)
|
||||||
if _font_size < 1:
|
|
||||||
_font_size = 1
|
|
||||||
if e.startswith("font_color") or e.startswith("fc="):
|
if e.startswith("font_color") or e.startswith("fc="):
|
||||||
_font_color = e.split("=")[-1]
|
_font_color = e.split("=")[-1]
|
||||||
text_img = await BuildImage.build_text_image(
|
text_img = await BuildImage.build_text_image(
|
||||||
@ -167,7 +161,7 @@ async def text2image(
|
|||||||
width = 0
|
width = 0
|
||||||
for img in image_list:
|
for img in image_list:
|
||||||
height += img.h
|
height += img.h
|
||||||
width = width if width > img.w else img.w
|
width = max(width, img.w)
|
||||||
width += pw
|
width += pw
|
||||||
height += ph
|
height += ph
|
||||||
A = BuildImage(width + left_padding, height + top_padding, color=color)
|
A = BuildImage(width + left_padding, height + top_padding, color=color)
|
||||||
@ -179,12 +173,11 @@ async def text2image(
|
|||||||
width = 0
|
width = 0
|
||||||
height = 0
|
height = 0
|
||||||
_, h = BuildImage.get_text_size("正", _font)
|
_, h = BuildImage.get_text_size("正", _font)
|
||||||
line_height = int(font_size / 3)
|
line_height = font_size // 3
|
||||||
image_list = []
|
|
||||||
for s in text.split("\n"):
|
for s in text.split("\n"):
|
||||||
w, _ = BuildImage.get_text_size(s.strip() or "正", _font)
|
w, _ = BuildImage.get_text_size(s.strip() or "正", _font)
|
||||||
height += h + line_height
|
height += h + line_height
|
||||||
width = width if width > w else w
|
width = max(width, w)
|
||||||
image_list.append(
|
image_list.append(
|
||||||
await BuildImage.build_text_image(
|
await BuildImage.build_text_image(
|
||||||
s.strip(), font, font_size, font_color
|
s.strip(), font, font_size, font_color
|
||||||
@ -205,7 +198,7 @@ async def text2image(
|
|||||||
return A
|
return A
|
||||||
|
|
||||||
|
|
||||||
def group_image(image_list: list[BuildImage]) -> tuple[list[list[BuildImage]], int]:
|
def group_image(image_list: list[BuildImage]) -> tuple[list[list[BuildImage]], float]:
|
||||||
"""
|
"""
|
||||||
说明:
|
说明:
|
||||||
根据图片大小进行分组
|
根据图片大小进行分组
|
||||||
@ -240,7 +233,7 @@ def group_image(image_list: list[BuildImage]) -> tuple[list[list[BuildImage]], i
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
total_w += max([x.width for x in group]) + 15
|
total_w += max(x.width for x in group) + 15
|
||||||
image_group.append(group)
|
image_group.append(group)
|
||||||
while surplus_list:
|
while surplus_list:
|
||||||
surplus_list = [x for x in surplus_list if x.uid not in is_use]
|
surplus_list = [x for x in surplus_list if x.uid not in is_use]
|
||||||
@ -252,7 +245,7 @@ def group_image(image_list: list[BuildImage]) -> tuple[list[list[BuildImage]], i
|
|||||||
_w = 0
|
_w = 0
|
||||||
index = -1
|
index = -1
|
||||||
for i, ig in enumerate(image_group):
|
for i, ig in enumerate(image_group):
|
||||||
if s := sum([x.height for x in ig]) > _w:
|
if s := sum(x.height for x in ig) > _w:
|
||||||
_w = s
|
_w = s
|
||||||
index = i
|
index = i
|
||||||
if index != -1:
|
if index != -1:
|
||||||
@ -262,29 +255,29 @@ def group_image(image_list: list[BuildImage]) -> tuple[list[list[BuildImage]], i
|
|||||||
max_h = 0
|
max_h = 0
|
||||||
max_w = 0
|
max_w = 0
|
||||||
for ig in image_group:
|
for ig in image_group:
|
||||||
if (_h := sum([x.height + 15 for x in ig])) > max_h:
|
if (_h := sum(x.height + 15 for x in ig)) > max_h:
|
||||||
max_h = _h
|
max_h = _h
|
||||||
max_w += max([x.width for x in ig]) + 30
|
max_w += max(x.width for x in ig) + 30
|
||||||
is_use.clear()
|
is_use.clear()
|
||||||
while abs(max_h - max_w) > 200 and len(image_group) - 1 >= len(image_group[-1]):
|
while abs(max_h - max_w) > 200 and len(image_group) - 1 >= len(image_group[-1]):
|
||||||
for img in image_group[-1]:
|
for img in image_group[-1]:
|
||||||
_min_h = 999999
|
_min_h = 999999
|
||||||
_min_index = -1
|
_min_index = -1
|
||||||
for i, ig in enumerate(image_group):
|
for i, ig in enumerate(image_group):
|
||||||
if (_h := sum([x.height for x in ig]) + img.height) < _min_h:
|
if (_h := sum(x.height for x in ig) + img.height) < _min_h:
|
||||||
_min_h = _h
|
_min_h = _h
|
||||||
_min_index = i
|
_min_index = i
|
||||||
is_use.append(_min_index)
|
is_use.append(_min_index)
|
||||||
image_group[_min_index].append(img)
|
image_group[_min_index].append(img)
|
||||||
max_w -= max([x.width for x in image_group[-1]]) - 30
|
max_w -= max(x.width for x in image_group[-1]) - 30
|
||||||
image_group.pop(-1)
|
image_group.pop(-1)
|
||||||
max_h = max([sum([x.height + 15 for x in ig]) for ig in image_group])
|
max_h = max(sum(x.height + 15 for x in ig) for ig in image_group)
|
||||||
return image_group, max(max_h + 250, max_w + 70)
|
return image_group, max(max_h + 250, max_w + 70)
|
||||||
|
|
||||||
|
|
||||||
async def build_sort_image(
|
async def build_sort_image(
|
||||||
image_group: list[list[BuildImage]],
|
image_group: list[list[BuildImage]],
|
||||||
h: int | None = None,
|
h: float | None = None,
|
||||||
padding_top: int = 200,
|
padding_top: int = 200,
|
||||||
color: ColorAlias = (
|
color: ColorAlias = (
|
||||||
255,
|
255,
|
||||||
@ -307,16 +300,15 @@ async def build_sort_image(
|
|||||||
"""
|
"""
|
||||||
bk_file = None
|
bk_file = None
|
||||||
if background_path:
|
if background_path:
|
||||||
random_bk = os.listdir(background_path)
|
if random_bk := os.listdir(background_path):
|
||||||
if random_bk:
|
|
||||||
bk_file = random.choice(random_bk)
|
bk_file = random.choice(random_bk)
|
||||||
image_w = 0
|
image_w = 0
|
||||||
image_h = 0
|
image_h = 0
|
||||||
if not h:
|
if not h:
|
||||||
for ig in image_group:
|
for ig in image_group:
|
||||||
_w = max([x.width + 30 for x in ig])
|
_w = max(x.width + 30 for x in ig)
|
||||||
image_w += _w + 30
|
image_w += _w + 30
|
||||||
_h = sum([x.height + 10 for x in ig])
|
_h = sum(x.height + 10 for x in ig)
|
||||||
if _h > image_h:
|
if _h > image_h:
|
||||||
image_h = _h
|
image_h = _h
|
||||||
image_h += padding_top
|
image_h += padding_top
|
||||||
@ -342,7 +334,7 @@ async def build_sort_image(
|
|||||||
for img in ig:
|
for img in ig:
|
||||||
await A.paste(img, (curr_w, curr_h))
|
await A.paste(img, (curr_w, curr_h))
|
||||||
curr_h += img.height + 10
|
curr_h += img.height + 10
|
||||||
curr_w += max([x.width for x in ig]) + 30
|
curr_w += max(x.width for x in ig) + 30
|
||||||
return A
|
return A
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -64,6 +64,25 @@ class ResourceDirManager:
|
|||||||
cls.__tree_append(path, deep)
|
cls.__tree_append(path, deep)
|
||||||
|
|
||||||
|
|
||||||
|
def is_binary_file(file_path: str) -> bool:
|
||||||
|
"""判断是否为二进制文件"""
|
||||||
|
binary_extensions = {
|
||||||
|
".jpg",
|
||||||
|
".jpeg",
|
||||||
|
".png",
|
||||||
|
".gif",
|
||||||
|
".bmp",
|
||||||
|
".ico",
|
||||||
|
".pdf",
|
||||||
|
".zip",
|
||||||
|
".rar",
|
||||||
|
".7z",
|
||||||
|
".exe",
|
||||||
|
".dll",
|
||||||
|
}
|
||||||
|
return any(file_path.lower().endswith(ext) for ext in binary_extensions)
|
||||||
|
|
||||||
|
|
||||||
def cn2py(word: str) -> str:
|
def cn2py(word: str) -> str:
|
||||||
"""将字符串转化为拼音
|
"""将字符串转化为拼音
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user