zhenxun_bot/zhenxun/utils/image_utils.py
HibiKier 4e33bf3a50
版本更新 (#1666)
*  父级插件加载

*  添加测试:更新与添加插件 (#1594)

*  测试更新与添加插件

*  Sourcery建议

* 👷 添加pytest

* 🎨 优化代码

* 🐛 bug修复

* 🐛修复添加插件返回403的问题 (#1595)

* 完善测试方法
* vscode测试配置
* 重构插件安装过程

* 🎨 修改readme

* Update README.md

* 🐛 修改bug与版本锁定

* 🐛 修复超级用户对群组功能开关

* 🐛 修复插件商店检查插件更新问题 (#1597)

* 🐛 修复插件商店检查插件更新问题

* 🐛 恶意命令检测问题

* 🐛 增加插件状态检查 (#1598)

*  优化测试用例

* 🐛 更改插件更新与安装逻辑

* 🐛 修复更新群组成员信息

* 🎨 代码优化

* 🚀 更新Dockerfile (#1599)

* 🎨 更新requirements

*  添加依赖aiocache

*  添加github镜像

*  添加仓库目录多获取渠道

* 🐛 修复测试用例

*  添加API缓存

* 🎨 采取Sourcery建议

* 🐛 文件下载逻辑修改

* 🎨 优化代码

* 🐛 修复插件开关有时出现错误

*  重构自检ui

* 🐛 自检html修正

* 修复签到逻辑bug,并使代码更灵活以适应签到好感度等级配置 (#1606)

* 修复签到功能已知问题

* 修复签到功能已知问题

* 修改参数名称

* 修改uid判断

---------

Co-authored-by: HibiKier <45528451+HibiKier@users.noreply.github.com>

* 🎨 代码结构优化

* 🐛 私聊时修改插件时删除私聊帮助

* 🐛 过滤父插件

* 🐛 修复自检在ARM上的问题 (#1607)

* 🐛 修复自检在ARM上的问题

*  优化测试

*  支持mysql,psql,sqlite随机函数

* 🔧 VSCode配置修改

* 🔧 VSCode配置修改

*  添加金币排行

Co-Authored-By: HibiKier <45528451+HibiKier@users.noreply.github.com>

* 📝 修改README

Co-Authored-By: HibiKier <45528451+HibiKier@users.noreply.github.com>

* 🔨 提取GitHub相关操作 (#1609)

* 🔨 提取GitHub相关操作

* 🔨 重构API策略

*  签到/金币排行限制最大数量 (#1616)

*  签到/金币排行限制最大数量

* 🐛 修复超级用户id获取问题

* 🐛 修复路径解压与挂载 (#1619)

* 🐛 修复功能少时zhenxun帮助图片排序问题 (#1620)

* 🐛 签到文本适应 (#1622)

* 🐛 好感度排行提供默认值 (#1624)

* 🎈 优先使用github api (#1625)

*  重构帮助,限制普通用户查询管理插件 (#1626)

* 🐛 修复群权限与插件等级匹配 (#1627)

*  当管理员尝试ban真寻时将被反杀 (#1628)

*  群组发言时间检测提供开关配置 (#1630)

* 🐳 chore: 支持自动修改版本号 (#1629)

* 🎈 perf(github_utils): 支持github url下载遍历 (#1632)

* 🎈 perf(github_utils): 支持github url下载遍历

* 🐞 fix(http_utils): 修复一些下载问题

* 🦄 refactor(http_utils): 部分重构

* chore(version): Update version to v0.2.2-e6f17c4

---------

Co-authored-by: AkashiCoin <AkashiCoin@users.noreply.github.com>

* 🧪 test(auto_update): 修复测试用例 (#1633)

* 🐛 修复商店商品为空时报错 (#1634)

* 🐛 修复群权限与插件等级匹配 (#1635)

*  message_build支持AtAll (#1639)

* 🎈 perf: 使用commit号下载插件 (#1641)

* 🎈 perf: 使用commit号下载插件

* chore(version): Update version to v0.2.2-f9c7360

---------

Co-authored-by: AkashiCoin <AkashiCoin@users.noreply.github.com>

* 🐳 chore: 修改运行检查触发路径 (#1642)

* 🐳 chore: 修改运行检查触发路径

* 🐳 chore: 添加tests目录

*  重构qq群事件处理 (#1643)

* 🐛 签到名称自适应 (#1644)

* 🎨  更新README (#1645)

* 🐛 fix(http_utils): 流式下载Content-Length错误 (#1647)

* 🐛 修复群组中帮助功能状态显示问题 (#1650)

* 🐛 修复群欢迎消息设置 (#1651)

* 🐛 修复webui下载后首次启动错误 (#1652)

* 🐛 修复webui下载后首次启动错误

* chore(version): Update version to v0.2.2-4a8ef85

---------

Co-authored-by: HibiKier <HibiKier@users.noreply.github.com>

*  移除默认图片文件夹:爬 (#1653)

*  安装/移除插件提供插件安装/卸载方法用于插件初始化 (#1654)

*  新增超级用户与管理员帮助模板 (#1655)

*  新增个人信息命令 (#1657)

*  修改个人信息菜单名称 (#1658)

*  新增插件商店api (#1659)

*  新增插件商店api

* chore(version): Update version to v0.2.2-7e15f20

---------

Co-authored-by: HibiKier <HibiKier@users.noreply.github.com>

*  将cd,block,count限制复原配置文件 (#1662)

* 🎨 修改README (#1663)

* 🎨 修改版本号 (#1664)

* 🎨 修改requirements (#1665)

---------

Co-authored-by: AkashiCoin <l1040186796@gmail.com>
Co-authored-by: fanyinrumeng <42991257+fanyinrumeng@users.noreply.github.com>
Co-authored-by: AkashiCoin <i@loli.vet>
Co-authored-by: Elaga <1728903318@qq.com>
Co-authored-by: AkashiCoin <AkashiCoin@users.noreply.github.com>
Co-authored-by: HibiKier <HibiKier@users.noreply.github.com>
2024-10-01 00:42:23 +08:00

423 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import random
from io import BytesIO
from pathlib import Path
from collections.abc import Callable, Awaitable
import cv2
import imagehash
from PIL import Image
from nonebot.utils import is_coroutine_callable
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.configs.path_config import TEMP_PATH, IMAGE_PATH
from ._build_image import BuildImage, ColorAlias
from ._build_mat import MatType, BuildMat # noqa: F401
from ._image_template import RowStyle, ImageTemplate # noqa: F401
# TODO: text2image 长度错误
async def text2image(
text: str,
auto_parse: bool = True,
font_size: int = 20,
color: str | tuple[int, int, int] = (255, 255, 255),
font: str = "HYWenHei-85W.ttf",
font_color: str | tuple[int, int, int] = (0, 0, 0),
padding: int | tuple[int, int, int, int] = 0,
_add_height: float = 0,
) -> BuildImage:
"""解析文本并转为图片
使用标签
<f> </f>
可选配置项
font: str -> 特殊文本字体
fs / font_size: int -> 特殊文本大小
fc / font_color: Union[str, Tuple[int, int, int]] -> 特殊文本颜色
示例
在不在,<f font=YSHaoShenTi-2.ttf font_size=30 font_color=red>HibiKi小姐</f>
你最近还好吗,<f font_size=15 font_color=black>我非常想你</f>,这段时间我非常不好过,
<f font_size=25>抽卡抽不到金色</f>,这让我很痛苦
参数:
text: 文本
auto_parse: 是否自动解析,否则原样发送
font_size: 普通字体大小
color: 背景颜色
font: 普通字体
font_color: 普通字体颜色
padding: 文本外边距,元组类型时为 (上,左,下,右)
_add_height: 由于get_size无法返回正确的高度采用手动方式额外添加高度
"""
if not text:
raise ValueError("文本转图片 text 不能为空...")
pw = ph = top_padding = left_padding = 0
if padding:
if isinstance(padding, int):
pw = padding * 2
ph = padding * 2
top_padding = left_padding = padding
elif isinstance(padding, tuple):
pw = padding[0] + padding[2]
ph = padding[1] + padding[3]
top_padding = padding[0]
left_padding = padding[1]
_font = BuildImage.load_font(font, font_size)
if auto_parse and re.search(r"<f(.*)>(.*)</f>", text):
_data = []
new_text = ""
placeholder_index = 0
for s in text.split("</f>"):
r = re.search(r"<f(.*)>(.*)", s)
if r:
start, end = r.span()
if start != 0 and (t := s[:start]):
new_text += t
_data.append(
[
(start, end),
f"[placeholder_{placeholder_index}]",
r.group(1).strip(),
r.group(2),
]
)
new_text += f"[placeholder_{placeholder_index}]"
placeholder_index += 1
new_text += text.split("</f>")[-1]
image_list = []
current_placeholder_index = 0
# 切分换行,每行为单张图片
for s in new_text.split("\n"):
_tmp_text = s
img_width = 0
img_height = BuildImage.get_text_size("", _font)[1]
_tmp_index = current_placeholder_index
for _ in range(s.count("[placeholder_")):
placeholder = _data[_tmp_index]
if "font_size" in placeholder[2]:
r = re.search(r"font_size=['\"]?(\d+)", placeholder[2])
if r:
w, h = BuildImage.get_text_size(
placeholder[3], font, int(r.group(1))
)
img_height = img_height if img_height > h else h
img_width += w
else:
img_width += BuildImage.get_text_size(placeholder[3], _font)[0]
_tmp_text = _tmp_text.replace(f"[placeholder_{_tmp_index}]", "")
_tmp_index += 1
img_width += BuildImage.get_text_size(_tmp_text, _font)[0]
# 开始画图
A = BuildImage(
img_width, img_height, color=color, font=font, font_size=font_size
)
basic_font_h = A.getsize("")[1]
current_width = 0
# 遍历占位符
for _ in range(s.count("[placeholder_")):
if not s.startswith(f"[placeholder_{current_placeholder_index}]"):
slice_ = s.split(f"[placeholder_{current_placeholder_index}]")
await A.text(
(current_width, A.height - basic_font_h - 1),
slice_[0],
font_color,
)
current_width += A.getsize(slice_[0])[0]
placeholder = _data[current_placeholder_index]
# 解析配置
_font = font
_font_size = font_size
_font_color = font_color
for e in placeholder[2].split():
if e.startswith("font="):
_font = e.split("=")[-1]
if e.startswith("font_size=") or e.startswith("fs="):
_font_size = int(e.split("=")[-1])
if _font_size > 1000:
_font_size = 1000
if _font_size < 1:
_font_size = 1
if e.startswith("font_color") or e.startswith("fc="):
_font_color = e.split("=")[-1]
text_img = await BuildImage.build_text_image(
placeholder[3], font=_font, size=_font_size, font_color=_font_color
)
_img_h = (
int(A.height / 2 - text_img.height / 2)
if new_text == "[placeholder_0]"
else A.height - text_img.height
)
await A.paste(text_img, (current_width, _img_h - 1))
current_width += text_img.width
s = s[
s.index(f"[placeholder_{current_placeholder_index}]")
+ len(f"[placeholder_{current_placeholder_index}]") :
]
current_placeholder_index += 1
if s:
slice_ = s.split(f"[placeholder_{current_placeholder_index}]")
await A.text((current_width, A.height - basic_font_h), slice_[0])
current_width += A.getsize(slice_[0])[0]
await A.crop((0, 0, current_width, A.height))
# A.show()
image_list.append(A)
height = 0
width = 0
for img in image_list:
height += img.h
width = width if width > img.w else img.w
width += pw
height += ph
A = BuildImage(width + left_padding, height + top_padding, color=color)
current_height = top_padding
for img in image_list:
await A.paste(img, (left_padding, current_height))
current_height += img.h
else:
width = 0
height = 0
_, h = BuildImage.get_text_size("", _font)
line_height = int(font_size / 3)
image_list = []
for s in text.split("\n"):
w, _ = BuildImage.get_text_size(s.strip() or "", _font)
height += h + line_height
width = width if width > w else w
image_list.append(
await BuildImage.build_text_image(
s.strip(), font, font_size, font_color
)
)
height = sum(img.height + 8 for img in image_list) + pw
width += pw
# height += ph
A = BuildImage(
width + left_padding,
height + top_padding + 2,
color=color,
)
cur_h = ph
for img in image_list:
await A.paste(img, (pw, cur_h))
cur_h += img.height + line_height
return A
def group_image(image_list: list[BuildImage]) -> tuple[list[list[BuildImage]], int]:
"""
说明:
根据图片大小进行分组
参数:
image_list: 排序图片列表
"""
image_list.sort(key=lambda x: x.height, reverse=True)
max_image = max(image_list, key=lambda x: x.height)
image_list.remove(max_image)
max_h = max_image.height
total_w = 0
# 图片分组
image_group = [[max_image]]
is_use = []
surplus_list = image_list[:]
for image in image_list:
if image.uid not in is_use:
group = [image]
is_use.append(image.uid)
curr_h = image.height
while True:
surplus_list = [x for x in surplus_list if x.uid not in is_use]
for tmp in surplus_list:
temp_h = curr_h + tmp.height + 10
if temp_h < max_h or abs(max_h - temp_h) < 100:
curr_h += tmp.height + 15
is_use.append(tmp.uid)
group.append(tmp)
break
else:
break
total_w += max([x.width for x in group]) + 15
image_group.append(group)
while surplus_list:
surplus_list = [x for x in surplus_list if x.uid not in is_use]
if not surplus_list:
break
surplus_list.sort(key=lambda x: x.height, reverse=True)
for img in surplus_list:
if img.uid not in is_use:
_w = 0
index = -1
for i, ig in enumerate(image_group):
if s := sum([x.height for x in ig]) > _w:
_w = s
index = i
if index != -1:
image_group[index].append(img)
is_use.append(img.uid)
max_h = 0
max_w = 0
for ig in image_group:
if (_h := sum([x.height + 15 for x in ig])) > max_h:
max_h = _h
max_w += max([x.width for x in ig]) + 30
is_use.clear()
while abs(max_h - max_w) > 200 and len(image_group) - 1 >= len(image_group[-1]):
for img in image_group[-1]:
_min_h = 999999
_min_index = -1
for i, ig in enumerate(image_group):
# if i not in is_use and (_h := sum([x.h for x in ig]) + img.h) > _min_h:
if (_h := sum([x.height for x in ig]) + img.height) < _min_h:
_min_h = _h
_min_index = i
is_use.append(_min_index)
image_group[_min_index].append(img)
max_w -= max([x.width for x in image_group[-1]]) - 30
image_group.pop(-1)
max_h = max([sum([x.height + 15 for x in ig]) for ig in image_group])
return image_group, max(max_h + 250, max_w + 70)
async def build_sort_image(
image_group: list[list[BuildImage]],
h: int | None = None,
padding_top: int = 200,
color: ColorAlias = (
255,
255,
255,
),
background_path: Path | None = None,
background_handle: Callable[[BuildImage], Awaitable] | None = None,
) -> BuildImage:
"""
说明:
对group_image的图片进行组装
参数:
image_group: 分组图片列表
h: max(宽,高)一般为group_image的返回值有值时图片必定为正方形
padding_top: 图像列表与最顶层间距
color: 背景颜色
background_path: 背景图片文件夹路径(随机)
background_handle: 背景图额外操作
"""
bk_file = None
if background_path:
random_bk = os.listdir(background_path)
if random_bk:
bk_file = random.choice(random_bk)
image_w = 0
image_h = 0
if not h:
for ig in image_group:
_w = max([x.width + 30 for x in ig])
image_w += _w + 30
_h = sum([x.height + 10 for x in ig])
if _h > image_h:
image_h = _h
image_h += padding_top
else:
image_w = h
image_h = h
A = BuildImage(
image_w,
image_h,
font_size=24,
font="CJGaoDeGuo.otf",
color=color,
background=(background_path / bk_file) if background_path and bk_file else None,
)
if background_handle:
if is_coroutine_callable(background_handle):
await background_handle(A)
else:
background_handle(A)
curr_w = 50
for ig in image_group:
curr_h = padding_top - 20
for img in ig:
await A.paste(img, (curr_w, curr_h))
curr_h += img.height + 10
curr_w += max([x.width for x in ig]) + 30
return A
def compressed_image(
in_file: str | Path,
out_file: str | Path | None = None,
ratio: float = 0.9,
):
"""压缩图片
参数:
in_file: 被压缩的文件路径
out_file: 压缩后输出的文件路径
ratio: 压缩率,宽高 * 压缩率
"""
in_file = IMAGE_PATH / in_file if isinstance(in_file, str) else in_file
if out_file:
out_file = IMAGE_PATH / out_file if isinstance(out_file, str) else out_file
else:
out_file = in_file
h, w, d = cv2.imread(str(in_file.absolute())).shape
img = cv2.resize(
cv2.imread(str(in_file.absolute())), (int(w * ratio), int(h * ratio))
)
cv2.imwrite(str(out_file.absolute()), img)
def get_img_hash(image_file: str | Path) -> str:
"""获取图片的hash值
参数:
image_file: 图片文件路径
返回:
str: 哈希值
"""
hash_value = ""
try:
with open(image_file, "rb") as fp:
hash_value = imagehash.average_hash(Image.open(fp))
except Exception as e:
logger.warning("获取图片Hash出错", "禁言检测", e=e)
return str(hash_value)
async def get_download_image_hash(url: str, mark: str) -> str:
"""下载图片获取哈希值
参数:
url: 图片url
mark: 随机标志符
返回:
str: 哈希值
"""
try:
if await AsyncHttpx.download_file(
url, TEMP_PATH / f"compare_download_{mark}_img.jpg"
):
img_hash = get_img_hash(TEMP_PATH / f"compare_download_{mark}_img.jpg")
return str(img_hash)
except Exception as e:
logger.warning("下载读取图片Hash出错", e=e)
return ""
def pic2bytes(image) -> bytes:
"""获取bytes
返回:
bytes: bytes
"""
buf = BytesIO()
image.save(buf, format="PNG")
return buf.getvalue()