zhenxun_bot/plugins/statistics/statistics_handle.py
2022-04-04 20:33:37 +08:00

279 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, MessageEvent, Message
from models.group_info import GroupInfo
from configs.path_config import DATA_PATH, IMAGE_PATH
from nonebot.params import CommandArg, Command
from utils.image_utils import BuildMat
from utils.message_builder import image
from utils.manager import plugins2settings_manager
from typing import Tuple
import asyncio
import os
try:
import ujson as json
except ModuleNotFoundError:
import json
__zx_plugin_name__ = "功能调用统计可视化"
__plugin_usage__ = """
usage
功能调用统计可视化
指令:
功能调用统计
日功能调用统计
周功能调用统计 ?[功能]
月功能调用统计 ?[功能]
我的功能调用统计
我的日功能调用统计 ?[功能]
我的周功能调用统计 ?[功能]
我的月功能调用统计 ?[功能]
""".strip()
__plugin_superuser_usage__ = """
usage
功能调用统计可视化
指令:
全局功能调用统计
全局日功能调用统计
全局周功能调用统计 ?[功能]
全局月功能调用统计 ?[功能]
""".strip()
__plugin_des__ = "功能调用统计可视化"
__plugin_cmd__ = [
"功能调用统计",
"全局功能调用统计 [_superuser]",
"全局日功能调用统计 [_superuser]",
"全局周功能调用统计 ?[功能] [_superuser]",
"全局月功能调用统计 ?[功能] [_superuser]",
"周功能调用统计 ?[功能]",
"月功能调用统计 ?[功能]",
"我的功能调用统计",
"我的日功能调用统计 ?[功能]",
"我的周功能调用统计 ?[功能]",
"我的月功能调用统计 ?[功能]",
]
__plugin_type__ = ("功能调用统计可视化", 1)
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["功能调用统计"],
}
statistics = on_command(
"功能调用统计",
aliases={
"全局功能调用统计",
"全局日功能调用统计",
"全局周功能调用统计",
"全局月功能调用统计",
"日功能调用统计",
"周功能调用统计",
"月功能调用统计",
"我的功能调用统计",
"我的日功能调用统计",
"我的周功能调用统计",
"我的月功能调用统计",
},
priority=5,
block=True,
)
statistics_group_file = DATA_PATH / "statistics" / "_prefix_count.json"
statistics_user_file = DATA_PATH / "statistics" / "_prefix_user_count.json"
@statistics.handle()
async def _(bot: Bot, event: MessageEvent, cmd: Tuple[str, ...] = Command(), arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip()
if cmd[0][:2] == "全局":
if str(event.user_id) in bot.config.superusers:
data: dict = json.load(open(statistics_group_file, "r", encoding="utf8"))
if cmd[0][2] == '':
_type = 'day_statistics'
elif cmd[0][2] == '':
_type = 'week_statistics'
elif cmd[0][2] == '':
_type = 'month_statistics'
else:
_type = 'total_statistics'
tmp_dict = {}
data = data[_type]
if _type in ["day_statistics", "total_statistics"]:
for key in data['total']:
tmp_dict[key] = data['total'][key]
else:
for group in data.keys():
if group != 'total':
for day in data[group].keys():
for plugin_name in data[group][day].keys():
if data[group][day][plugin_name] is not None:
if tmp_dict.get(plugin_name) is None:
tmp_dict[plugin_name] = 1
else:
tmp_dict[plugin_name] += data[group][day][plugin_name]
bar_graph = await init_bar_graph(tmp_dict, cmd[0])
await asyncio.get_event_loop().run_in_executor(None, bar_graph.gen_graph)
await statistics.finish(image(b64=bar_graph.pic2bs4()))
return
if cmd[0][:2] == "我的":
_type = "user"
key = str(event.user_id)
cmd = list(cmd)
cmd[0] = cmd[0][2:]
if not statistics_user_file.exists():
await statistics.finish("统计文件不存在...", at_sender=True)
else:
if not isinstance(event, GroupMessageEvent):
await statistics.finish("请在群内调用此功能...")
_type = "group"
key = str(event.group_id)
if not statistics_group_file.exists():
await statistics.finish("统计文件不存在...", at_sender=True)
plugin = ""
if cmd[0][0] == "":
arg = "day_statistics"
elif cmd[0][0] == "":
arg = "week_statistics"
elif cmd[0][0] == "":
arg = "month_statistics"
else:
arg = "total_statistics"
if msg:
plugin = plugins2settings_manager.get_plugin_module(msg)
if not plugin:
if arg not in ["day_statistics", "total_statistics"]:
await statistics.finish("未找到此功能的调用...", at_sender=True)
if _type == "group":
data: dict = json.load(open(statistics_group_file, "r", encoding="utf8"))
if not data[arg].get(str(event.group_id)):
await statistics.finish("该群统计数据不存在...", at_sender=True)
else:
data: dict = json.load(open(statistics_user_file, "r", encoding="utf8"))
if not data[arg].get(str(event.user_id)):
await statistics.finish("该用户统计数据不存在...", at_sender=True)
day_index = data["day_index"]
data = data[arg][key]
if _type == "group":
name = await GroupInfo.get_group_info(event.group_id)
name = name.group_name if name else str(event.group_id)
else:
name = event.sender.card or event.sender.nickname
img = await generate_statistics_img(data, arg, name, plugin, day_index)
await statistics.send(image(b64=img))
async def generate_statistics_img(
data: dict, arg: str, name: str, plugin: str, day_index: int
):
try:
plugin = plugins2settings_manager.get_plugin_data(plugin)['cmd'][0]
except (KeyError, IndexError):
pass
bar_graph = None
if arg == "day_statistics":
bar_graph = await init_bar_graph(data, f"{name} 日功能调用统计")
elif arg == "week_statistics":
if plugin:
current_week = day_index % 7
week_lst = []
if current_week == 0:
week_lst = [1, 2, 3, 4, 5, 6, 7]
else:
for i in range(current_week + 1, 7):
week_lst.append(str(i))
for i in range(current_week + 1):
week_lst.append(str(i))
count = []
for i in range(7):
if int(week_lst[i]) == 7:
try:
count.append(data[str(0)][plugin])
except KeyError:
count.append(0)
else:
try:
count.append(data[str(week_lst[i])][plugin])
except KeyError:
count.append(0)
week_lst = ["7" if i == "0" else i for i in week_lst]
bar_graph = BuildMat(
y=count,
mat_type="line",
title=f"{name}{plugin} 功能调用统计【为7天统计】",
x_index=week_lst,
display_num=True,
background=[
f"{IMAGE_PATH}/background/create_mat/{x}"
for x in os.listdir(f"{IMAGE_PATH}/background/create_mat")
],
bar_color=["*"],
)
else:
bar_graph = await init_bar_graph(update_data(data), f"{name} 周功能调用统计【为7天统计】")
elif arg == "month_statistics":
if plugin:
day_index = day_index % 30
day_lst = []
for i in range(day_index + 1, 30):
day_lst.append(i)
for i in range(day_index + 1):
day_lst.append(i)
count = [data[str(day_lst[i])][plugin] for i in range(30)]
day_lst = [str(x + 1) for x in day_lst]
bar_graph = BuildMat(
y=count,
mat_type="line",
title=f"{name}{plugin} 功能调用统计【为30天统计】",
x_index=day_lst,
display_num=True,
background=[
f"{IMAGE_PATH}/background/create_mat/{x}"
for x in os.listdir(f"{IMAGE_PATH}/background/create_mat")
],
bar_color=["*"],
)
else:
bar_graph = await init_bar_graph(update_data(data), f"{name} 月功能调用统计【为30天统计】")
elif arg == "total_statistics":
bar_graph = await init_bar_graph(data, f"{name} 功能调用统计")
await asyncio.get_event_loop().run_in_executor(None, bar_graph.gen_graph)
return bar_graph.pic2bs4()
async def init_bar_graph(data: dict, title: str) -> BuildMat:
return await asyncio.get_event_loop().run_in_executor(None, _init_bar_graph, data, title)
def _init_bar_graph(data: dict, title: str) -> BuildMat:
bar_graph = BuildMat(
y=[data[x] for x in data.keys() if data[x] != 0],
mat_type="barh",
title=title,
x_index=[x for x in data.keys() if data[x] != 0],
display_num=True,
background=[
f"{IMAGE_PATH}/background/create_mat/{x}"
for x in os.listdir(f"{IMAGE_PATH}/background/create_mat")
],
bar_color=["*"],
)
return bar_graph
def update_data(data: dict):
tmp_dict = {}
for day in data.keys():
for plugin_name in data[day].keys():
# print(f'{day}{plugin_name} = {data[day][plugin_name]}')
if data[day][plugin_name] is not None:
if tmp_dict.get(plugin_name) is None:
tmp_dict[plugin_name] = 1
else:
tmp_dict[plugin_name] += data[day][plugin_name]
return tmp_dict