mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
Merge branch 'HibiKier:main' into main
This commit is contained in:
commit
d75653631d
@ -60,7 +60,7 @@ async def add_live_sub(live_id: int, sub_user: str) -> str:
|
||||
live_status=live_status,
|
||||
):
|
||||
await _get_up_status(live_id)
|
||||
uname = (await BilibiliSub.get_sub(live_id)).uname
|
||||
uname = (await BilibiliSub.get_sub(uid)).uname
|
||||
return (
|
||||
"已成功订阅主播:\n"
|
||||
f"\ttitle:{title}\n"
|
||||
|
||||
@ -128,7 +128,7 @@ async def get_character(
|
||||
) -> Optional[dict]:
|
||||
try:
|
||||
req = await AsyncHttpx.post(
|
||||
url="https://api-takumi.mihoyo.com/game_record/app/genshin/api/character",
|
||||
url="https://api-takumi-record.mihoyo.com/game_record/app/genshin/api/character",
|
||||
headers={
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"DS": get_ds(
|
||||
@ -232,7 +232,7 @@ async def get_mys_data(uid: str, mys_id: Optional[str]) -> Optional[List[Dict]]:
|
||||
if mys_id:
|
||||
try:
|
||||
req = await AsyncHttpx.get(
|
||||
url=f"https://api-takumi.mihoyo.com/game_record/card/wapi/getGameRecordCard?uid={mys_id}",
|
||||
url=f"https://api-takumi-record.mihoyo.com/game_record/card/wapi/getGameRecordCard?uid={mys_id}",
|
||||
headers={
|
||||
"DS": get_ds(f"uid={mys_id}"),
|
||||
"x-rpc-app_version": Config.get_config("genshin", "mhyVersion"),
|
||||
|
||||
200
plugins/word_clouds/__init__.py
Normal file
200
plugins/word_clouds/__init__.py
Normal file
@ -0,0 +1,200 @@
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Tuple, Union
|
||||
import pytz
|
||||
from nonebot import on_command, get_driver
|
||||
from nonebot.adapters.onebot.v11 import Message, MessageSegment
|
||||
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.params import Arg, Command, CommandArg, Depends
|
||||
from nonebot.typing import T_State
|
||||
from .data_source import draw_word_cloud, get_list_msg
|
||||
from configs.config import Config
|
||||
|
||||
__zx_plugin_name__ = "词云"
|
||||
|
||||
__plugin_usage__ = """
|
||||
usage:
|
||||
词云
|
||||
指令:
|
||||
获取今天的词云
|
||||
今日词云
|
||||
获取昨天的词云
|
||||
昨日词云
|
||||
获取本周词云
|
||||
本周词云
|
||||
获取本月词云
|
||||
本月词云
|
||||
获取年度词云
|
||||
年度词云
|
||||
|
||||
历史词云(支持 ISO8601 格式的日期与时间,如 2022-02-22T22:22:22)
|
||||
获取某日的词云
|
||||
历史词云 2022-01-01
|
||||
获取指定时间段的词云
|
||||
历史词云
|
||||
历史词云 2022-01-01~2022-02-22
|
||||
历史词云 2022-02-22T11:11:11~2022-02-22T22:22:22
|
||||
如果想要获取自己的发言,可在命令前添加 我的
|
||||
我的今日词云
|
||||
""".strip()
|
||||
__plugin_des__ = "词云"
|
||||
__plugin_cmd__ = ["今日词云", "昨日词云", "本周词云"]
|
||||
__plugin_version__ = 0.1
|
||||
__plugin_author__ = "yajiwa"
|
||||
__plugin_settings__ = {
|
||||
"level": 5,
|
||||
"default_status": True,
|
||||
"limit_superuser": False,
|
||||
"cmd": __plugin_cmd__,
|
||||
}
|
||||
wordcloud_cmd = on_command(
|
||||
"wordcloud",
|
||||
aliases={
|
||||
"词云",
|
||||
"今日词云",
|
||||
"昨日词云",
|
||||
"本周词云",
|
||||
"本月词云",
|
||||
"年度词云",
|
||||
"历史词云",
|
||||
"我的今日词云",
|
||||
"我的昨日词云",
|
||||
"我的本周词云",
|
||||
"我的本月词云",
|
||||
"我的年度词云",
|
||||
"我的历史词云",
|
||||
}, priority=5,
|
||||
)
|
||||
Config.add_plugin_config(
|
||||
"word_clouds",
|
||||
"WORD_CLOUDS_TEMPLATE",
|
||||
1,
|
||||
help_="词云模板 参1:图片生成,默认使用真寻图片,可在项目路径resources/image/wordcloud下配置图片,多张则随机 | 参2/其他:黑底图片"
|
||||
)
|
||||
|
||||
|
||||
def parse_datetime(key: str):
|
||||
"""解析数字,并将结果存入 state 中"""
|
||||
|
||||
async def _key_parser(
|
||||
matcher: Matcher,
|
||||
state: T_State,
|
||||
input: Union[datetime, Message] = Arg(key),
|
||||
):
|
||||
if isinstance(input, datetime):
|
||||
return
|
||||
|
||||
plaintext = input.extract_plain_text()
|
||||
try:
|
||||
state[key] = get_datetime_fromisoformat_with_timezone(plaintext)
|
||||
except ValueError:
|
||||
await matcher.reject_arg(key, "请输入正确的日期,不然我没法理解呢!")
|
||||
|
||||
return _key_parser
|
||||
|
||||
|
||||
def get_datetime_now_with_timezone() -> datetime:
|
||||
"""获取当前时间,并包含时区信息"""
|
||||
return datetime.now().astimezone()
|
||||
|
||||
|
||||
def get_datetime_fromisoformat_with_timezone(date_string: str) -> datetime:
|
||||
"""从 iso8601 格式字符串中获取时间,并包含时区信息"""
|
||||
return datetime.fromisoformat(date_string).astimezone()
|
||||
|
||||
|
||||
@wordcloud_cmd.handle()
|
||||
async def handle_first_receive(
|
||||
event: GroupMessageEvent,
|
||||
state: T_State,
|
||||
commands: Tuple[str, ...] = Command(),
|
||||
args: Message = CommandArg(),
|
||||
):
|
||||
command = commands[0]
|
||||
|
||||
if command.startswith("我的"):
|
||||
state["my"] = True
|
||||
command = command[2:]
|
||||
else:
|
||||
state["my"] = False
|
||||
|
||||
if command == "今日词云":
|
||||
dt = get_datetime_now_with_timezone()
|
||||
state["start"] = dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
state["stop"] = dt
|
||||
elif command == "昨日词云":
|
||||
dt = get_datetime_now_with_timezone()
|
||||
state["stop"] = dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
state["start"] = state["stop"] - timedelta(days=1)
|
||||
elif command == "本周词云":
|
||||
dt = get_datetime_now_with_timezone()
|
||||
state["start"] = dt.replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
) - timedelta(days=dt.weekday())
|
||||
state["stop"] = dt
|
||||
elif command == "本月词云":
|
||||
dt = get_datetime_now_with_timezone()
|
||||
state["start"] = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||||
state["stop"] = dt
|
||||
elif command == "年度词云":
|
||||
dt = get_datetime_now_with_timezone()
|
||||
state["start"] = dt.replace(
|
||||
month=1, day=1, hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
state["stop"] = dt
|
||||
elif command == "历史词云":
|
||||
plaintext = args.extract_plain_text().strip()
|
||||
match = re.match(r"^(.+?)(?:~(.+))?$", plaintext)
|
||||
if match:
|
||||
start = match.group(1)
|
||||
stop = match.group(2)
|
||||
try:
|
||||
state["start"] = get_datetime_fromisoformat_with_timezone(start)
|
||||
if stop:
|
||||
state["stop"] = get_datetime_fromisoformat_with_timezone(stop)
|
||||
else:
|
||||
# 如果没有指定结束日期,则认为是指查询这一天的词云
|
||||
state["start"] = state["start"].replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
state["stop"] = state["start"] + timedelta(days=1)
|
||||
except ValueError:
|
||||
await wordcloud_cmd.finish("请输入正确的日期,不然我没法理解呢!")
|
||||
else:
|
||||
await wordcloud_cmd.finish()
|
||||
|
||||
|
||||
@wordcloud_cmd.got(
|
||||
"start",
|
||||
prompt="请输入你要查询的起始日期(如 2022-01-01)",
|
||||
parameterless=[Depends(parse_datetime("start"))],
|
||||
)
|
||||
@wordcloud_cmd.got(
|
||||
"stop",
|
||||
prompt="请输入你要查询的结束日期(如 2022-02-22)",
|
||||
parameterless=[Depends(parse_datetime("stop"))],
|
||||
)
|
||||
async def handle_message(
|
||||
event: GroupMessageEvent,
|
||||
start: datetime = Arg(),
|
||||
stop: datetime = Arg(),
|
||||
my: bool = Arg(),
|
||||
):
|
||||
# 是否只查询自己的记录
|
||||
if my:
|
||||
user_id = int(event.user_id)
|
||||
else:
|
||||
user_id = None
|
||||
# 将时间转换到 东八 时区
|
||||
messages = await get_list_msg(user_id, int(event.group_id),
|
||||
days=(start.astimezone(pytz.timezone("Asia/Shanghai")),
|
||||
stop.astimezone(pytz.timezone("Asia/Shanghai"))))
|
||||
if messages:
|
||||
image_bytes = await draw_word_cloud(messages, get_driver().config)
|
||||
if image_bytes:
|
||||
await wordcloud_cmd.finish(MessageSegment.image(image_bytes), at_sender=my)
|
||||
else:
|
||||
await wordcloud_cmd.finish("生成词云失败", at_sender=my)
|
||||
else:
|
||||
await wordcloud_cmd.finish("没有获取到词云数据", at_sender=my)
|
||||
121
plugins/word_clouds/data_source.py
Normal file
121
plugins/word_clouds/data_source.py
Normal file
@ -0,0 +1,121 @@
|
||||
import asyncio
|
||||
import os
|
||||
import random
|
||||
import jieba.analyse
|
||||
import re
|
||||
from typing import List
|
||||
from PIL import Image as IMG
|
||||
import jieba
|
||||
from emoji import replace_emoji # type: ignore
|
||||
from wordcloud import WordCloud, ImageColorGenerator
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from io import BytesIO
|
||||
from configs.path_config import IMAGE_PATH, FONT_PATH
|
||||
from utils.http_utils import AsyncHttpx
|
||||
from models.chat_history import ChatHistory
|
||||
from configs.config import Config
|
||||
|
||||
|
||||
async def pre_precess(msg: List[str], config) -> str:
|
||||
return await asyncio.get_event_loop().run_in_executor(
|
||||
None, _pre_precess, msg,config)
|
||||
|
||||
|
||||
def _pre_precess(msg: List[str],config) -> str:
|
||||
"""对消息进行预处理"""
|
||||
# 过滤掉命令
|
||||
command_start = tuple([i for i in config.command_start if i])
|
||||
msg = " ".join([m for m in msg if not m.startswith(command_start)])
|
||||
|
||||
# 去除网址
|
||||
msg = re.sub(r"https?://[\w/:%#\$&\?\(\)~\.=\+\-]+", "", msg)
|
||||
|
||||
# 去除 \u200b
|
||||
msg = re.sub(r"[\u200b]", "", msg)
|
||||
|
||||
# 去除cq码
|
||||
msg = re.sub(r"\[CQ:.*?]", "", msg)
|
||||
|
||||
# 去除[]
|
||||
msg = re.sub("[	(1|3);]", "", msg)
|
||||
|
||||
# 去除 emoji
|
||||
# https://github.com/carpedm20/emoji
|
||||
msg = replace_emoji(msg)
|
||||
return msg
|
||||
|
||||
|
||||
|
||||
async def draw_word_cloud(messages, config):
|
||||
wordcloud_dir = IMAGE_PATH / "wordcloud"
|
||||
wordcloud_dir.mkdir(exist_ok=True, parents=True)
|
||||
# 默认用真寻图片
|
||||
zx_logo_path = wordcloud_dir / "default.png"
|
||||
wordcloud_ttf = FONT_PATH / "STKAITI.TTF"
|
||||
if not os.listdir(wordcloud_dir):
|
||||
url = "https://ghproxy.com/https://raw.githubusercontent.com/HibiKier/zhenxun_bot/main/resources/image/wordcloud/default.png"
|
||||
try:
|
||||
await AsyncHttpx.download_file(url, zx_logo_path)
|
||||
except:
|
||||
return False
|
||||
if not wordcloud_ttf.exists():
|
||||
ttf_url = 'https://ghproxy.com/https://raw.githubusercontent.com/HibiKier/zhenxun_bot/main/resources/font/STKAITI.TTF'
|
||||
try:
|
||||
await AsyncHttpx.download_file(ttf_url, wordcloud_ttf)
|
||||
except:
|
||||
return False
|
||||
|
||||
topK = min(int(len(messages)), 100000)
|
||||
read_name = jieba.analyse.extract_tags(await pre_precess(messages, config), topK=topK,
|
||||
withWeight=True,
|
||||
allowPOS=())
|
||||
name = []
|
||||
value = []
|
||||
for t in read_name:
|
||||
name.append(t[0])
|
||||
value.append(t[1])
|
||||
for i in range(len(name)):
|
||||
name[i] = str(name[i])
|
||||
dic = dict(zip(name, value))
|
||||
if Config.get_config("word_clouds", "WORD_CLOUDS_TEMPLATE") == 1:
|
||||
def random_pic(base_path: str) -> str:
|
||||
path_dir = os.listdir(base_path)
|
||||
path = random.sample(path_dir, 1)[0]
|
||||
return (str(base_path) + "/" + str(path))
|
||||
|
||||
mask = np.array(IMG.open(random_pic(wordcloud_dir)))
|
||||
wc = WordCloud(
|
||||
font_path=f"{wordcloud_ttf}",
|
||||
background_color="white",
|
||||
max_font_size=100,
|
||||
width=1920,
|
||||
height=1080,
|
||||
mask=mask,
|
||||
)
|
||||
wc.generate_from_frequencies(dic)
|
||||
image_colors = ImageColorGenerator(mask, default_color=(255, 255, 255))
|
||||
wc.recolor(color_func=image_colors)
|
||||
plt.imshow(wc.recolor(color_func=image_colors), interpolation="bilinear")
|
||||
plt.axis("off")
|
||||
else:
|
||||
wc = WordCloud(
|
||||
font_path=str(wordcloud_ttf),
|
||||
width=1920,
|
||||
height=1200,
|
||||
background_color="black",
|
||||
)
|
||||
wc.generate_from_frequencies(dic)
|
||||
bytes_io = BytesIO()
|
||||
img = wc.to_image()
|
||||
img.save(bytes_io, format="PNG")
|
||||
return bytes_io.getvalue()
|
||||
|
||||
|
||||
async def get_list_msg(user_id, group_id, days):
|
||||
messages_list = await ChatHistory()._get_msg(uid=user_id, gid=group_id, type_="group", days=days).gino.all()
|
||||
if messages_list:
|
||||
messages = [i.text for i in messages_list]
|
||||
return messages
|
||||
else:
|
||||
return False
|
||||
BIN
resources/font/STKAITI.TTF
Normal file
BIN
resources/font/STKAITI.TTF
Normal file
Binary file not shown.
BIN
resources/image/wordcloud/default.png
Normal file
BIN
resources/image/wordcloud/default.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 728 KiB |
Loading…
Reference in New Issue
Block a user