mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 06:12:53 +08:00
feat✨: 功能调用统计
This commit is contained in:
parent
2e17f56f1e
commit
2021a2cc1c
@ -54,7 +54,7 @@ from public.bag_users t1
|
||||
"""
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
# @driver.on_startup
|
||||
async def _():
|
||||
global flag
|
||||
await shop_register.load_register()
|
||||
|
||||
29
zhenxun/models/statistics.py
Normal file
29
zhenxun/models/statistics.py
Normal file
@ -0,0 +1,29 @@
|
||||
from tortoise import fields
|
||||
|
||||
from zhenxun.services.db_context import Model
|
||||
|
||||
|
||||
class Statistics(Model):
|
||||
|
||||
id = fields.IntField(pk=True, generated=True, auto_increment=True)
|
||||
"""自增id"""
|
||||
user_id = fields.CharField(255)
|
||||
"""用户id"""
|
||||
group_id = fields.CharField(255, null=True)
|
||||
"""群聊id"""
|
||||
plugin_name = fields.CharField(255)
|
||||
"""插件名称"""
|
||||
create_time = fields.DatetimeField(auto_now=True)
|
||||
"""添加日期"""
|
||||
|
||||
class Meta:
|
||||
table = "statistics"
|
||||
table_description = "插件调用统计数据库"
|
||||
|
||||
@classmethod
|
||||
async def _run_script(cls):
|
||||
return [
|
||||
"ALTER TABLE statistics RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id
|
||||
"ALTER TABLE statistics ALTER COLUMN user_id TYPE character varying(255);",
|
||||
"ALTER TABLE statistics ALTER COLUMN group_id TYPE character varying(255);",
|
||||
]
|
||||
@ -50,15 +50,6 @@ async def get_image_info(mod: str, url: str) -> str | list[Image | Text] | None:
|
||||
return await get_saucenao_image(url)
|
||||
|
||||
|
||||
# def parse_image(key: str):
|
||||
# async def _key_parser(state: T_State, img: Message = Arg(key)):
|
||||
# if not get_message_img(img):
|
||||
# await search_image.reject_arg(key, "请发送要识别的图片!")
|
||||
# state[key] = img
|
||||
|
||||
# return _key_parser
|
||||
|
||||
|
||||
@_matcher.handle()
|
||||
async def _(mode: Match[str], img: Match[alcImg]):
|
||||
if mode.available:
|
||||
|
||||
@ -189,7 +189,7 @@ async def _(
|
||||
if result := SetuManage.get_luo(float(user.impression)):
|
||||
await result.finish()
|
||||
is_r18 = arparma.find("r")
|
||||
_num = num.result
|
||||
_num = num.result if num.available else 1
|
||||
if is_r18 and gid:
|
||||
"""群聊中禁止查看r18"""
|
||||
if not base_config.get("ALLOW_GROUP_R18"):
|
||||
|
||||
131
zhenxun/plugins/statistics/__init__.py
Normal file
131
zhenxun/plugins/statistics/__init__.py
Normal file
@ -0,0 +1,131 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import nonebot
|
||||
import ujson as json
|
||||
|
||||
from zhenxun.configs.path_config import DATA_PATH
|
||||
|
||||
nonebot.load_plugins(str(Path(__file__).parent.resolve()))
|
||||
|
||||
old_file1 = DATA_PATH / "_prefix_count.json"
|
||||
old_file2 = DATA_PATH / "_prefix_user_count.json"
|
||||
new_path = DATA_PATH / "statistics"
|
||||
new_path.mkdir(parents=True, exist_ok=True)
|
||||
if old_file1.exists():
|
||||
os.rename(old_file1, new_path / "_prefix_count.json")
|
||||
if old_file2.exists():
|
||||
os.rename(old_file2, new_path / "_prefix_user_count.json")
|
||||
|
||||
|
||||
# 修改旧数据
|
||||
|
||||
statistics_group_file = DATA_PATH / "statistics" / "_prefix_count.json"
|
||||
statistics_user_file = DATA_PATH / "statistics" / "_prefix_user_count.json"
|
||||
|
||||
for file in [statistics_group_file, statistics_user_file]:
|
||||
if file.exists():
|
||||
with open(file, "r", encoding="utf8") as f:
|
||||
data = json.load(f)
|
||||
if not (statistics_group_file.parent / f"{file}.bak").exists():
|
||||
with open(f"{file}.bak", "w", encoding="utf8") as wf:
|
||||
json.dump(data, wf, ensure_ascii=False, indent=4)
|
||||
for x in ["total_statistics", "day_statistics"]:
|
||||
for key in data[x].keys():
|
||||
num = 0
|
||||
if data[x][key].get("ai") is not None:
|
||||
if data[x][key].get("Ai") is not None:
|
||||
data[x][key]["Ai"] += data[x][key]["ai"]
|
||||
else:
|
||||
data[x][key]["Ai"] = data[x][key]["ai"]
|
||||
del data[x][key]["ai"]
|
||||
if data[x][key].get("抽卡") is not None:
|
||||
if data[x][key].get("游戏抽卡") is not None:
|
||||
data[x][key]["游戏抽卡"] += data[x][key]["抽卡"]
|
||||
else:
|
||||
data[x][key]["游戏抽卡"] = data[x][key]["抽卡"]
|
||||
del data[x][key]["抽卡"]
|
||||
if data[x][key].get("我的道具") is not None:
|
||||
num += data[x][key]["我的道具"]
|
||||
del data[x][key]["我的道具"]
|
||||
if data[x][key].get("使用道具") is not None:
|
||||
num += data[x][key]["使用道具"]
|
||||
del data[x][key]["使用道具"]
|
||||
if data[x][key].get("我的金币") is not None:
|
||||
num += data[x][key]["我的金币"]
|
||||
del data[x][key]["我的金币"]
|
||||
if data[x][key].get("购买") is not None:
|
||||
num += data[x][key]["购买"]
|
||||
del data[x][key]["购买"]
|
||||
if data[x][key].get("商店") is not None:
|
||||
data[x][key]["商店"] += num
|
||||
else:
|
||||
data[x][key]["商店"] = num
|
||||
for x in ["week_statistics", "month_statistics"]:
|
||||
for key in data[x].keys():
|
||||
if key == "total":
|
||||
if data[x][key].get("ai") is not None:
|
||||
if data[x][key].get("Ai") is not None:
|
||||
data[x][key]["Ai"] += data[x][key]["ai"]
|
||||
else:
|
||||
data[x][key]["Ai"] = data[x][key]["ai"]
|
||||
del data[x][key]["ai"]
|
||||
if data[x][key].get("抽卡") is not None:
|
||||
if data[x][key].get("游戏抽卡") is not None:
|
||||
data[x][key]["游戏抽卡"] += data[x][key]["抽卡"]
|
||||
else:
|
||||
data[x][key]["游戏抽卡"] = data[x][key]["抽卡"]
|
||||
del data[x][key]["抽卡"]
|
||||
if data[x][key].get("我的道具") is not None:
|
||||
num += data[x][key]["我的道具"]
|
||||
del data[x][key]["我的道具"]
|
||||
if data[x][key].get("使用道具") is not None:
|
||||
num += data[x][key]["使用道具"]
|
||||
del data[x][key]["使用道具"]
|
||||
if data[x][key].get("我的金币") is not None:
|
||||
num += data[x][key]["我的金币"]
|
||||
del data[x][key]["我的金币"]
|
||||
if data[x][key].get("购买") is not None:
|
||||
num += data[x][key]["购买"]
|
||||
del data[x][key]["购买"]
|
||||
if data[x][key].get("商店") is not None:
|
||||
data[x][key]["商店"] += num
|
||||
else:
|
||||
data[x][key]["商店"] = num
|
||||
else:
|
||||
for day in data[x][key].keys():
|
||||
num = 0
|
||||
if data[x][key][day].get("ai") is not None:
|
||||
if data[x][key][day].get("Ai") is not None:
|
||||
data[x][key][day]["Ai"] += data[x][key][day]["ai"]
|
||||
else:
|
||||
data[x][key][day]["Ai"] = data[x][key][day]["ai"]
|
||||
del data[x][key][day]["ai"]
|
||||
if data[x][key][day].get("抽卡") is not None:
|
||||
if data[x][key][day].get("游戏抽卡") is not None:
|
||||
data[x][key][day]["游戏抽卡"] += data[x][key][day][
|
||||
"抽卡"
|
||||
]
|
||||
else:
|
||||
data[x][key][day]["游戏抽卡"] = data[x][key][day][
|
||||
"抽卡"
|
||||
]
|
||||
del data[x][key][day]["抽卡"]
|
||||
if data[x][key][day].get("我的道具") is not None:
|
||||
num += data[x][key][day]["我的道具"]
|
||||
del data[x][key][day]["我的道具"]
|
||||
if data[x][key][day].get("使用道具") is not None:
|
||||
num += data[x][key][day]["使用道具"]
|
||||
del data[x][key][day]["使用道具"]
|
||||
if data[x][key][day].get("我的金币") is not None:
|
||||
num += data[x][key][day]["我的金币"]
|
||||
del data[x][key][day]["我的金币"]
|
||||
if data[x][key][day].get("购买") is not None:
|
||||
num += data[x][key][day]["购买"]
|
||||
del data[x][key][day]["购买"]
|
||||
if data[x][key][day].get("商店") is not None:
|
||||
data[x][key][day]["商店"] += num
|
||||
else:
|
||||
data[x][key][day]["商店"] = num
|
||||
with open(file, "w", encoding="utf8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||||
131
zhenxun/plugins/statistics/_data_source.py
Normal file
131
zhenxun/plugins/statistics/_data_source.py
Normal file
@ -0,0 +1,131 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from tortoise.functions import Count
|
||||
|
||||
from zhenxun.models.group_console import GroupConsole
|
||||
from zhenxun.models.group_member_info import GroupInfoUser
|
||||
from zhenxun.models.plugin_info import PluginInfo
|
||||
from zhenxun.models.statistics import Statistics
|
||||
from zhenxun.models.user_console import UserConsole
|
||||
from zhenxun.utils.enum import PluginType
|
||||
from zhenxun.utils.image_utils import BuildImage, BuildMat, MatType
|
||||
|
||||
|
||||
class StatisticsManage:
|
||||
|
||||
@classmethod
|
||||
async def get_statistics(
|
||||
cls,
|
||||
plugin_name: str | None,
|
||||
is_global: bool,
|
||||
search_type: str | None,
|
||||
user_id: str | None = None,
|
||||
group_id: str | None = None,
|
||||
):
|
||||
day = None
|
||||
day_type = ""
|
||||
if search_type == "day":
|
||||
day = 1
|
||||
day_type = "日"
|
||||
if search_type == "week":
|
||||
day = 7
|
||||
day_type = "周"
|
||||
if search_type == "month":
|
||||
day = 30
|
||||
day_type = "月"
|
||||
if day_type:
|
||||
day_type += f"({day}天)"
|
||||
title = ""
|
||||
if user_id:
|
||||
"""查用户"""
|
||||
query = GroupInfoUser.filter(user_id=user_id)
|
||||
if group_id:
|
||||
query = query.filter(group_id=group_id)
|
||||
user = await query.first()
|
||||
title = f"{user.user_name if user else user_id} {day_type}功能调用统计"
|
||||
elif group_id:
|
||||
"""查群组"""
|
||||
group = await GroupConsole.get_or_none(
|
||||
group_id=group_id, channel_id__isnull=True
|
||||
)
|
||||
title = f"{group.group_name if group else group_id} {day_type}功能调用统计"
|
||||
else:
|
||||
title = "功能调用统计"
|
||||
if is_global and not user_id:
|
||||
title = "全局 " + title
|
||||
return await cls.get_global_statistics(plugin_name, day, title)
|
||||
if user_id:
|
||||
return await cls.get_my_statistics(user_id, group_id, day, title)
|
||||
if group_id:
|
||||
return await cls.get_group_statistics(group_id, day, title)
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
async def get_global_statistics(
|
||||
cls, plugin_name: str | None, day: int | None, title: str
|
||||
) -> BuildImage | str:
|
||||
query = Statistics
|
||||
if plugin_name:
|
||||
query = query.filter(plugin_name=plugin_name)
|
||||
if day:
|
||||
time = datetime.now() - timedelta(days=day)
|
||||
query = query.filter(create_time__gte=time)
|
||||
data_list = (
|
||||
await query.annotate(count=Count("id"))
|
||||
.group_by("plugin_name")
|
||||
.values_list("plugin_name", "count")
|
||||
)
|
||||
if not data_list:
|
||||
return "统计数据为空..."
|
||||
return await cls.__build_image(data_list, title)
|
||||
|
||||
@classmethod
|
||||
async def get_my_statistics(
|
||||
cls, user_id: str, group_id: str | None, day: int | None, title: str
|
||||
):
|
||||
query = Statistics.filter(user_id=user_id)
|
||||
if group_id:
|
||||
query = query.filter(group_id=group_id)
|
||||
if day:
|
||||
time = datetime.now() - timedelta(days=day)
|
||||
query = query.filter(create_time__gte=time)
|
||||
data_list = (
|
||||
await query.annotate(count=Count("id"))
|
||||
.group_by("plugin_name")
|
||||
.values_list("plugin_name", "count")
|
||||
)
|
||||
if not data_list:
|
||||
return "统计数据为空..."
|
||||
return await cls.__build_image(data_list, title)
|
||||
|
||||
@classmethod
|
||||
async def get_group_statistics(cls, group_id: str, day: int | None, title: str):
|
||||
query = Statistics.filter(group_id=group_id)
|
||||
if day:
|
||||
time = datetime.now() - timedelta(days=day)
|
||||
query = query.filter(create_time__gte=time)
|
||||
data_list = (
|
||||
await query.annotate(count=Count("id"))
|
||||
.group_by("plugin_name")
|
||||
.values_list("plugin_name", "count")
|
||||
)
|
||||
if not data_list:
|
||||
return "统计数据为空..."
|
||||
return await cls.__build_image(data_list, title)
|
||||
|
||||
@classmethod
|
||||
async def __build_image(cls, data_list: list[tuple[str, int]], title: str):
|
||||
mat = BuildMat(MatType.BARH)
|
||||
module2count = {x[0]: x[1] for x in data_list}
|
||||
plugin_info = await PluginInfo.filter(
|
||||
module__in=module2count.keys(), plugin_type=PluginType.NORMAL
|
||||
).all()
|
||||
x_index = []
|
||||
data = []
|
||||
for plugin in plugin_info:
|
||||
x_index.append(plugin.name)
|
||||
data.append(module2count.get(plugin.module, 0))
|
||||
mat.x_index = x_index
|
||||
mat.data = data
|
||||
mat.title = title
|
||||
return await mat.build()
|
||||
170
zhenxun/plugins/statistics/statistics_handle.py
Normal file
170
zhenxun/plugins/statistics/statistics_handle.py
Normal file
@ -0,0 +1,170 @@
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
import ujson as json
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
Args,
|
||||
Arparma,
|
||||
Match,
|
||||
Option,
|
||||
on_alconna,
|
||||
store_true,
|
||||
)
|
||||
from nonebot_plugin_saa import Image, Text
|
||||
from nonebot_plugin_session import EventSession
|
||||
|
||||
from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH
|
||||
from zhenxun.configs.utils import PluginExtraData
|
||||
from zhenxun.models.group_info import GroupInfo
|
||||
from zhenxun.utils.depends import OneCommand
|
||||
from zhenxun.utils.enum import PluginType
|
||||
from zhenxun.utils.image_utils import BuildMat
|
||||
|
||||
from ._data_source import StatisticsManage
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="功能调用统计可视化",
|
||||
description="功能调用统计可视化",
|
||||
usage="""
|
||||
usage:
|
||||
功能调用统计可视化
|
||||
指令:
|
||||
功能调用统计
|
||||
日功能调用统计
|
||||
周功能调用统计
|
||||
月功能调用统计
|
||||
我的功能调用统计 : 当前群我的统计
|
||||
我的功能调用统计 -g: 我的全局统计
|
||||
我的日功能调用统计
|
||||
我的周功能调用统计
|
||||
我的月功能调用统计
|
||||
""".strip(),
|
||||
extra=PluginExtraData(
|
||||
author="HibiKier",
|
||||
version="0.1",
|
||||
plugin_type=PluginType.ADMIN,
|
||||
menu_type="数据统计",
|
||||
aliases={"功能调用统计"},
|
||||
superuser_help="""
|
||||
"全局功能调用统计",
|
||||
"全局日功能调用统计",
|
||||
"全局周功能调用统计",
|
||||
"全局月功能调用统计",
|
||||
""".strip(),
|
||||
).dict(),
|
||||
)
|
||||
|
||||
|
||||
_matcher = on_alconna(
|
||||
Alconna(
|
||||
"功能调用统计",
|
||||
Args["name?", str],
|
||||
Option("-g|--global", action=store_true, help_text="全局统计"),
|
||||
Option("-my", action=store_true, help_text="我的"),
|
||||
Option("-t|--type", Args["search_type", ["day", "week", "month"]]),
|
||||
),
|
||||
priority=5,
|
||||
block=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"日功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "day"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"周功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "week"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"月功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "month"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"全局功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-g"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"全局日功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "day", "-g"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"全局周功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "week", "-g"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"全局月功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "month", "-g"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"我的功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-my"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"我的日功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "day", "-my"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"我的周功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "week", "-my"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
_matcher.shortcut(
|
||||
"我的月功能调用统计(?P<name>.*)",
|
||||
command="功能调用统计",
|
||||
arguments=["{name}", "-t", "month", "-my"],
|
||||
prefix=True,
|
||||
)
|
||||
|
||||
|
||||
@_matcher.handle()
|
||||
async def _(
|
||||
session: EventSession, arparma: Arparma, name: Match[str], search_type: Match[str]
|
||||
):
|
||||
plugin_name = name.result if name.available else None
|
||||
st = search_type.result if search_type.available else None
|
||||
uid = session.id1 if arparma.find("my") else None
|
||||
gid = session.id3 or session.id2
|
||||
is_global = arparma.find("global")
|
||||
if uid and is_global:
|
||||
"""个人全局"""
|
||||
gid = None
|
||||
if result := await StatisticsManage.get_statistics(
|
||||
plugin_name, arparma.find("global"), st, uid, gid
|
||||
):
|
||||
if isinstance(result, str):
|
||||
await Text(result).finish(reply=True)
|
||||
else:
|
||||
await Image(result.pic2bytes()).send()
|
||||
else:
|
||||
await Text("获取数据失败...").send()
|
||||
40
zhenxun/plugins/statistics/statistics_hook.py
Normal file
40
zhenxun/plugins/statistics/statistics_hook.py
Normal file
@ -0,0 +1,40 @@
|
||||
from datetime import datetime
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_postprocessor
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_session import EventSession
|
||||
|
||||
from zhenxun.configs.utils import PluginExtraData
|
||||
from zhenxun.models.plugin_info import PluginInfo
|
||||
from zhenxun.models.statistics import Statistics
|
||||
from zhenxun.utils.enum import PluginType
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="功能调用统计",
|
||||
description="功能调用统计",
|
||||
usage="""""".strip(),
|
||||
extra=PluginExtraData(
|
||||
author="HibiKier", version="0.1", plugin_type=PluginType.HIDDEN
|
||||
).dict(),
|
||||
)
|
||||
|
||||
|
||||
@run_postprocessor
|
||||
async def _(
|
||||
matcher: Matcher, exception: Exception | None, bot: Bot, session: EventSession
|
||||
):
|
||||
plugin = await PluginInfo.get_or_none(module=matcher.plugin_name)
|
||||
plugin_type = plugin.plugin_type if plugin else None
|
||||
if (
|
||||
plugin_type == PluginType.NORMAL
|
||||
and matcher.priority not in [1, 999]
|
||||
and matcher.plugin_name not in ["update_info", "statistics_handle"]
|
||||
):
|
||||
await Statistics.create(
|
||||
user_id=session.id1,
|
||||
group_id=session.id3 or session.id2,
|
||||
plugin_name=matcher.plugin_name,
|
||||
create_time=datetime.now(),
|
||||
)
|
||||
@ -33,7 +33,7 @@ class BuildMatData(BaseModel):
|
||||
"""显示轴坐标值"""
|
||||
y_index: list[int | float] = []
|
||||
"""数据轴坐标值"""
|
||||
space: tuple[int, int] = (15, 15)
|
||||
space: tuple[int, int] = (20, 20)
|
||||
"""坐标值间隔(X, Y)"""
|
||||
rotate: tuple[int, int] = (0, 0)
|
||||
"""坐标值旋转(X, Y)"""
|
||||
@ -68,9 +68,13 @@ class BuildMat:
|
||||
mark_image: BuildImage
|
||||
"""BuildImage"""
|
||||
x_height: int
|
||||
"""横坐标高度"""
|
||||
"""横坐标开始高度"""
|
||||
y_width: int
|
||||
"""纵坐标开始宽度"""
|
||||
x_point: list[int]
|
||||
"""横坐标坐标"""
|
||||
y_point: list[int]
|
||||
"""纵坐标坐标"""
|
||||
graph_height: int
|
||||
"""坐标轴高度"""
|
||||
|
||||
@ -233,9 +237,9 @@ class BuildMat:
|
||||
else:
|
||||
raise ValueError("y轴坐标值必须有序...")
|
||||
|
||||
async def build(self):
|
||||
async def build(self) -> BuildImage:
|
||||
"""构造图片"""
|
||||
A = None
|
||||
A = BuildImage(1, 1)
|
||||
bar_color = self.build_data.bar_color
|
||||
if "*" in bar_color:
|
||||
bar_color = [
|
||||
@ -252,12 +256,12 @@ class BuildMat:
|
||||
if self.build_data.mat_type == MatType.LINE:
|
||||
mark_image = await self._build_line_graph(init_graph, bar_color)
|
||||
if self.build_data.mat_type == MatType.BAR:
|
||||
pass
|
||||
mark_image = await self._build_bar_graph(init_graph, bar_color)
|
||||
if self.build_data.mat_type == MatType.BARH:
|
||||
pass
|
||||
mark_image = await self._build_barh_graph(init_graph, bar_color)
|
||||
if mark_image:
|
||||
padding_width, padding_height = self.build_data.padding
|
||||
width = mark_image.width + padding_width * 2
|
||||
width = mark_image.width + padding_width
|
||||
height = mark_image.height + padding_height * 2
|
||||
if self.build_data.background:
|
||||
if isinstance(self.build_data.background, bytes):
|
||||
@ -269,7 +273,7 @@ class BuildMat:
|
||||
else:
|
||||
A = BuildImage(width, height, self.build_data.background_color)
|
||||
if A:
|
||||
await A.paste(mark_image, (padding_width, padding_height))
|
||||
await A.paste(mark_image, (10, padding_height))
|
||||
if self.build_data.title:
|
||||
font = BuildImage.load_font(
|
||||
self.build_data.font, self.build_data.font_size + 7
|
||||
@ -305,108 +309,169 @@ class BuildMat:
|
||||
padding_width = 0
|
||||
padding_height = 0
|
||||
font = BuildImage.load_font(self.build_data.font, self.build_data.font_size)
|
||||
width_list = []
|
||||
height_list = []
|
||||
x_width_list = []
|
||||
y_height_list = []
|
||||
for x in self.build_data.x_index:
|
||||
text_size = BuildImage.get_text_size(x, font)
|
||||
if text_size[1] > padding_height:
|
||||
padding_height = text_size[1]
|
||||
width_list.append(text_size[0])
|
||||
x_width_list.append(text_size)
|
||||
if not self.build_data.y_index:
|
||||
"""没有指定y_index时,使用data自动生成"""
|
||||
max_num = max(self.build_data.data)
|
||||
if max_num < 5:
|
||||
max_num = 5
|
||||
s = int(max_num / 5)
|
||||
_y_index = [max_num]
|
||||
for _n in range(4):
|
||||
max_num -= s
|
||||
_y_index.append(max_num)
|
||||
_y_index.sort()
|
||||
self.build_data.y_index = _y_index
|
||||
# if len(_y_index) > 1:
|
||||
# if _y_index[0] == _y_index[-1]:
|
||||
# _tmp = ["_" for _ in range(len(_y_index) - 1)]
|
||||
# _tmp.append(str(_y_index[0]))
|
||||
# _y_index = _tmp
|
||||
self.build_data.y_index = _y_index # type: ignore
|
||||
for item in self.build_data.y_index:
|
||||
text_size = BuildImage.get_text_size(str(item), font)
|
||||
if text_size[0] > padding_width:
|
||||
padding_width = text_size[0]
|
||||
height_list.append(text_size[1])
|
||||
width = (
|
||||
sum([w + self.build_data.space[0] for w in width_list])
|
||||
+ height_list[0]
|
||||
+ self.build_data.space[0] * 2
|
||||
+ 20
|
||||
)
|
||||
y_height_list.append(text_size)
|
||||
if self.build_data.mat_type == MatType.BARH:
|
||||
_tmp = x_width_list
|
||||
x_width_list = y_height_list
|
||||
y_height_list = _tmp
|
||||
old_space = self.build_data.space
|
||||
width = padding_width * 2 + self.build_data.space[0] * 2 + 20
|
||||
height = (
|
||||
sum([h + self.build_data.space[1] for h in height_list])
|
||||
sum([h[1] + self.build_data.space[1] for h in y_height_list])
|
||||
+ self.build_data.space[1] * 2
|
||||
+ 30
|
||||
)
|
||||
_x_index = self.build_data.x_index
|
||||
_y_index = self.build_data.y_index
|
||||
_barh_max_text_width = 0
|
||||
if self.build_data.mat_type == MatType.BARH:
|
||||
"""横向柱状图时xy轴长度调换"""
|
||||
_tmp = height
|
||||
height = width
|
||||
width = _tmp
|
||||
"""XY轴下标互换"""
|
||||
_tmp = _y_index
|
||||
_y_index = _x_index
|
||||
_x_index = _tmp
|
||||
"""额外增加字体宽度"""
|
||||
for s in self.build_data.x_index:
|
||||
s_w, s_h = BuildImage.get_text_size(s, font)
|
||||
if s_w > _barh_max_text_width:
|
||||
_barh_max_text_width = s_w
|
||||
width += _barh_max_text_width
|
||||
width += self.build_data.space[0] * 2 - old_space[0] * 2
|
||||
"""X轴重新等均分配"""
|
||||
x_length = width - padding_width * 2 - _barh_max_text_width
|
||||
x_space = int((x_length - 20) / (len(_x_index) + 1))
|
||||
if x_space < 50:
|
||||
"""加大间距更加美观"""
|
||||
x_space = 50
|
||||
self.build_data.space = (x_space, self.build_data.space[1])
|
||||
width += self.build_data.space[0] * (len(_x_index) - 1)
|
||||
else:
|
||||
"""非横向柱状图时加字体宽度"""
|
||||
width += sum([w[0] + self.build_data.space[0] for w in x_width_list])
|
||||
|
||||
A = BuildImage(
|
||||
width,
|
||||
width + 5,
|
||||
(height + 10),
|
||||
# color=(255, 255, 255),
|
||||
color=(255, 255, 255, 0),
|
||||
)
|
||||
padding_height += 5
|
||||
"""高"""
|
||||
await A.line(
|
||||
(
|
||||
padding_width + 5,
|
||||
padding_width + 5 + _barh_max_text_width,
|
||||
padding_height,
|
||||
padding_width + 5,
|
||||
padding_width + 5 + _barh_max_text_width,
|
||||
height - padding_height,
|
||||
),
|
||||
width=2,
|
||||
)
|
||||
"""长"""
|
||||
await A.line(
|
||||
(
|
||||
padding_width + 5,
|
||||
padding_width + 5 + _barh_max_text_width,
|
||||
height - padding_height,
|
||||
width - padding_width + 5,
|
||||
height - padding_height,
|
||||
),
|
||||
width=2,
|
||||
)
|
||||
_x_index = self.build_data.x_index
|
||||
_y_index = self.build_data.y_index
|
||||
if self.build_data.mat_type == MatType.BARH:
|
||||
_tmp = _y_index
|
||||
_y_index = _x_index
|
||||
_x_index = _tmp
|
||||
cur_width = padding_width + self.build_data.space[0] * 2
|
||||
cur_height = height - height_list[0] - 5
|
||||
x_cur_width = (
|
||||
padding_width + _barh_max_text_width + self.build_data.space[0] + 5
|
||||
)
|
||||
if self.build_data.mat_type != MatType.BARH:
|
||||
"""添加字体宽度"""
|
||||
x_cur_width += x_width_list[0][0]
|
||||
x_cur_height = height - y_height_list[0][1] - 5
|
||||
# await A.point((x_cur_width, x_cur_height), (0, 0, 0))
|
||||
x_point = []
|
||||
for i, _x in enumerate(_x_index):
|
||||
"""X轴数值"""
|
||||
grid_height = cur_height
|
||||
grid_height = x_cur_height
|
||||
if self.build_data.is_grid:
|
||||
grid_height = padding_height
|
||||
await A.line((cur_width, cur_height - 1, cur_width, grid_height - 5))
|
||||
x_point.append(cur_width - 1)
|
||||
mid_point = cur_width - int(width_list[i] / 2)
|
||||
await A.text((mid_point, cur_height), str(_x), font=font)
|
||||
cur_width += width_list[i] + self.build_data.space[0]
|
||||
cur_width = padding_width
|
||||
cur_height = height - self.build_data.padding[1]
|
||||
await A.line(
|
||||
(
|
||||
x_cur_width,
|
||||
x_cur_height - 1,
|
||||
x_cur_width,
|
||||
grid_height - 5,
|
||||
)
|
||||
)
|
||||
x_point.append(x_cur_width - 1)
|
||||
mid_point = x_cur_width - int(x_width_list[i][0] / 2)
|
||||
await A.text((mid_point, x_cur_height), str(_x), font=font)
|
||||
x_cur_width += self.build_data.space[0]
|
||||
if self.build_data.mat_type != MatType.BARH:
|
||||
"""添加字体宽度"""
|
||||
x_cur_width += x_width_list[i][0]
|
||||
y_cur_width = padding_width + _barh_max_text_width
|
||||
y_cur_height = height - self.build_data.padding[1] - 9
|
||||
start_height = y_cur_height
|
||||
# await A.point((y_cur_width, y_cur_height), (0, 0, 0))
|
||||
y_point = []
|
||||
for i, _y in enumerate(_y_index):
|
||||
"""Y轴数值"""
|
||||
grid_width = cur_width
|
||||
grid_width = y_cur_width
|
||||
if self.build_data.is_grid:
|
||||
grid_width = width - padding_width + 5
|
||||
await A.line((cur_width + 5, cur_height, grid_width + 11, cur_height))
|
||||
y_point.append(y_cur_height)
|
||||
await A.line((y_cur_width + 5, y_cur_height, grid_width + 11, y_cur_height))
|
||||
text_width = BuildImage.get_text_size(str(_y), font)[0]
|
||||
await A.text(
|
||||
(cur_width - text_width, cur_height - int(height_list[i] / 2) - 3),
|
||||
(
|
||||
y_cur_width - text_width,
|
||||
y_cur_height - int(y_height_list[i][1] / 2) - 3,
|
||||
),
|
||||
str(_y),
|
||||
font=font,
|
||||
)
|
||||
cur_height -= height_list[i] + self.build_data.space[1]
|
||||
graph_height = height - self.build_data.padding[1] - cur_height + 5
|
||||
y_cur_height -= y_height_list[i][1] + self.build_data.space[1]
|
||||
graph_height = 0
|
||||
if self.build_data.mat_type == MatType.BARH:
|
||||
graph_height = (
|
||||
x_cur_width
|
||||
- self.build_data.space[0]
|
||||
- _barh_max_text_width
|
||||
- padding_width
|
||||
- 5
|
||||
)
|
||||
else:
|
||||
graph_height = start_height - y_cur_height + 7
|
||||
return self.InitGraph(
|
||||
mark_image=A,
|
||||
x_height=height - height_list[0] - 5,
|
||||
x_height=height - y_height_list[0][1] - 5,
|
||||
y_width=padding_width + 5 + _barh_max_text_width,
|
||||
graph_height=graph_height,
|
||||
x_point=x_point,
|
||||
y_point=y_point,
|
||||
)
|
||||
|
||||
async def _build_line_graph(
|
||||
@ -432,9 +497,9 @@ class BuildMat:
|
||||
point_list = []
|
||||
for x_p, y in zip(init_graph.x_point, self.build_data.data):
|
||||
"""折线图标点"""
|
||||
y_height = int(y / max_num * init_graph.graph_height)
|
||||
await mark_image.paste(_black_point, (x_p, x_height - y_height))
|
||||
point_list.append((x_p + 4, x_height - y_height + 4))
|
||||
y_height = int(y / max_num * graph_height)
|
||||
await mark_image.paste(_black_point, (x_p - 3, x_height - y_height))
|
||||
point_list.append((x_p + 1, x_height - y_height + 1))
|
||||
for i in range(len(point_list) - 1):
|
||||
"""画线"""
|
||||
a_x, a_y = point_list[i]
|
||||
@ -462,8 +527,41 @@ class BuildMat:
|
||||
)
|
||||
return mark_image
|
||||
|
||||
async def _build_bar_graph(self):
|
||||
async def _build_bar_graph(self, init_graph: InitGraph, bar_color: list[str]):
|
||||
"""构建折线图
|
||||
|
||||
参数:
|
||||
init_graph: InitGraph
|
||||
bar_color: 颜色列表
|
||||
|
||||
返回:
|
||||
BuildImage: 折线图
|
||||
"""
|
||||
pass
|
||||
|
||||
async def _build_barh_graph(self):
|
||||
pass
|
||||
async def _build_barh_graph(self, init_graph: InitGraph, bar_color: list[str]):
|
||||
"""构建折线图
|
||||
|
||||
参数:
|
||||
init_graph: InitGraph
|
||||
bar_color: 颜色列表
|
||||
|
||||
返回:
|
||||
BuildImage: 横向柱状图
|
||||
"""
|
||||
font = BuildImage.load_font(self.build_data.font, self.build_data.font_size)
|
||||
mark_image = init_graph.mark_image
|
||||
y_width = init_graph.y_width
|
||||
graph_height = init_graph.graph_height
|
||||
random_color = random.choice(bar_color)
|
||||
max_num = max(self.y_index)
|
||||
for y_p, y in zip(init_graph.y_point, self.build_data.data):
|
||||
bar_width = int(y / max_num * graph_height)
|
||||
bar = BuildImage(bar_width, 18, random_color)
|
||||
await mark_image.paste(bar, (y_width + 1, y_p - 9))
|
||||
if self.build_data.display_num:
|
||||
"""显示数值"""
|
||||
await mark_image.text(
|
||||
(y_width + bar_width + 5, y_p - 12), str(y), font=font
|
||||
)
|
||||
return mark_image
|
||||
|
||||
Loading…
Reference in New Issue
Block a user