diff --git a/plugins/bilibili_sub/data_source.py b/plugins/bilibili_sub/data_source.py index 111b0abe..ffb1d557 100755 --- a/plugins/bilibili_sub/data_source.py +++ b/plugins/bilibili_sub/data_source.py @@ -60,7 +60,7 @@ async def add_live_sub(live_id: int, sub_user: str) -> str: live_status=live_status, ): await _get_up_status(live_id) - uname = (await BilibiliSub.get_sub(live_id)).uname + uname = (await BilibiliSub.get_sub(uid)).uname return ( "已成功订阅主播:\n" f"\ttitle:{title}\n" diff --git a/plugins/genshin/query_user/query_role/data_source.py b/plugins/genshin/query_user/query_role/data_source.py index 8ecdb447..71a04842 100644 --- a/plugins/genshin/query_user/query_role/data_source.py +++ b/plugins/genshin/query_user/query_role/data_source.py @@ -128,7 +128,7 @@ async def get_character( ) -> Optional[dict]: try: req = await AsyncHttpx.post( - url="https://api-takumi.mihoyo.com/game_record/app/genshin/api/character", + url="https://api-takumi-record.mihoyo.com/game_record/app/genshin/api/character", headers={ "Accept": "application/json, text/plain, */*", "DS": get_ds( @@ -232,7 +232,7 @@ async def get_mys_data(uid: str, mys_id: Optional[str]) -> Optional[List[Dict]]: if mys_id: try: req = await AsyncHttpx.get( - url=f"https://api-takumi.mihoyo.com/game_record/card/wapi/getGameRecordCard?uid={mys_id}", + url=f"https://api-takumi-record.mihoyo.com/game_record/card/wapi/getGameRecordCard?uid={mys_id}", headers={ "DS": get_ds(f"uid={mys_id}"), "x-rpc-app_version": Config.get_config("genshin", "mhyVersion"), diff --git a/plugins/word_clouds/__init__.py b/plugins/word_clouds/__init__.py new file mode 100644 index 00000000..a821ff19 --- /dev/null +++ b/plugins/word_clouds/__init__.py @@ -0,0 +1,200 @@ +import re +from datetime import datetime, timedelta +from typing import Tuple, Union +import pytz +from nonebot import on_command, get_driver +from nonebot.adapters.onebot.v11 import Message, MessageSegment +from nonebot.adapters.onebot.v11.event import GroupMessageEvent +from nonebot.matcher import Matcher +from nonebot.params import Arg, Command, CommandArg, Depends +from nonebot.typing import T_State +from .data_source import draw_word_cloud, get_list_msg +from configs.config import Config + +__zx_plugin_name__ = "词云" + +__plugin_usage__ = """ +usage: + 词云 + 指令: + 获取今天的词云 + 今日词云 + 获取昨天的词云 + 昨日词云 + 获取本周词云 + 本周词云 + 获取本月词云 + 本月词云 + 获取年度词云 + 年度词云 + + 历史词云(支持 ISO8601 格式的日期与时间,如 2022-02-22T22:22:22) + 获取某日的词云 + 历史词云 2022-01-01 + 获取指定时间段的词云 + 历史词云 + 历史词云 2022-01-01~2022-02-22 + 历史词云 2022-02-22T11:11:11~2022-02-22T22:22:22 + 如果想要获取自己的发言,可在命令前添加 我的 + 我的今日词云 +""".strip() +__plugin_des__ = "词云" +__plugin_cmd__ = ["今日词云", "昨日词云", "本周词云"] +__plugin_version__ = 0.1 +__plugin_author__ = "yajiwa" +__plugin_settings__ = { + "level": 5, + "default_status": True, + "limit_superuser": False, + "cmd": __plugin_cmd__, +} +wordcloud_cmd = on_command( + "wordcloud", + aliases={ + "词云", + "今日词云", + "昨日词云", + "本周词云", + "本月词云", + "年度词云", + "历史词云", + "我的今日词云", + "我的昨日词云", + "我的本周词云", + "我的本月词云", + "我的年度词云", + "我的历史词云", + }, priority=5, +) +Config.add_plugin_config( + "word_clouds", + "WORD_CLOUDS_TEMPLATE", + 1, + help_="词云模板 参1:图片生成,默认使用真寻图片,可在项目路径resources/image/wordcloud下配置图片,多张则随机 | 参2/其他:黑底图片" +) + + +def parse_datetime(key: str): + """解析数字,并将结果存入 state 中""" + + async def _key_parser( + matcher: Matcher, + state: T_State, + input: Union[datetime, Message] = Arg(key), + ): + if isinstance(input, datetime): + return + + plaintext = input.extract_plain_text() + try: + state[key] = get_datetime_fromisoformat_with_timezone(plaintext) + except ValueError: + await matcher.reject_arg(key, "请输入正确的日期,不然我没法理解呢!") + + return _key_parser + + +def get_datetime_now_with_timezone() -> datetime: + """获取当前时间,并包含时区信息""" + return datetime.now().astimezone() + + +def get_datetime_fromisoformat_with_timezone(date_string: str) -> datetime: + """从 iso8601 格式字符串中获取时间,并包含时区信息""" + return datetime.fromisoformat(date_string).astimezone() + + +@wordcloud_cmd.handle() +async def handle_first_receive( + event: GroupMessageEvent, + state: T_State, + commands: Tuple[str, ...] = Command(), + args: Message = CommandArg(), +): + command = commands[0] + + if command.startswith("我的"): + state["my"] = True + command = command[2:] + else: + state["my"] = False + + if command == "今日词云": + dt = get_datetime_now_with_timezone() + state["start"] = dt.replace(hour=0, minute=0, second=0, microsecond=0) + state["stop"] = dt + elif command == "昨日词云": + dt = get_datetime_now_with_timezone() + state["stop"] = dt.replace(hour=0, minute=0, second=0, microsecond=0) + state["start"] = state["stop"] - timedelta(days=1) + elif command == "本周词云": + dt = get_datetime_now_with_timezone() + state["start"] = dt.replace( + hour=0, minute=0, second=0, microsecond=0 + ) - timedelta(days=dt.weekday()) + state["stop"] = dt + elif command == "本月词云": + dt = get_datetime_now_with_timezone() + state["start"] = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + state["stop"] = dt + elif command == "年度词云": + dt = get_datetime_now_with_timezone() + state["start"] = dt.replace( + month=1, day=1, hour=0, minute=0, second=0, microsecond=0 + ) + state["stop"] = dt + elif command == "历史词云": + plaintext = args.extract_plain_text().strip() + match = re.match(r"^(.+?)(?:~(.+))?$", plaintext) + if match: + start = match.group(1) + stop = match.group(2) + try: + state["start"] = get_datetime_fromisoformat_with_timezone(start) + if stop: + state["stop"] = get_datetime_fromisoformat_with_timezone(stop) + else: + # 如果没有指定结束日期,则认为是指查询这一天的词云 + state["start"] = state["start"].replace( + hour=0, minute=0, second=0, microsecond=0 + ) + state["stop"] = state["start"] + timedelta(days=1) + except ValueError: + await wordcloud_cmd.finish("请输入正确的日期,不然我没法理解呢!") + else: + await wordcloud_cmd.finish() + + +@wordcloud_cmd.got( + "start", + prompt="请输入你要查询的起始日期(如 2022-01-01)", + parameterless=[Depends(parse_datetime("start"))], +) +@wordcloud_cmd.got( + "stop", + prompt="请输入你要查询的结束日期(如 2022-02-22)", + parameterless=[Depends(parse_datetime("stop"))], +) +async def handle_message( + event: GroupMessageEvent, + start: datetime = Arg(), + stop: datetime = Arg(), + my: bool = Arg(), +): + # 是否只查询自己的记录 + if my: + user_id = int(event.user_id) + else: + user_id = None + # 将时间转换到 东八 时区 + messages = await get_list_msg(user_id, int(event.group_id), + days=(start.astimezone(pytz.timezone("Asia/Shanghai")), + stop.astimezone(pytz.timezone("Asia/Shanghai")))) + if messages: + image_bytes = await draw_word_cloud(messages, get_driver().config) + if image_bytes: + await wordcloud_cmd.finish(MessageSegment.image(image_bytes), at_sender=my) + else: + await wordcloud_cmd.finish("生成词云失败", at_sender=my) + else: + await wordcloud_cmd.finish("没有获取到词云数据", at_sender=my) diff --git a/plugins/word_clouds/data_source.py b/plugins/word_clouds/data_source.py new file mode 100644 index 00000000..17067e42 --- /dev/null +++ b/plugins/word_clouds/data_source.py @@ -0,0 +1,121 @@ +import asyncio +import os +import random +import jieba.analyse +import re +from typing import List +from PIL import Image as IMG +import jieba +from emoji import replace_emoji # type: ignore +from wordcloud import WordCloud, ImageColorGenerator +import numpy as np +import matplotlib.pyplot as plt +from io import BytesIO +from configs.path_config import IMAGE_PATH, FONT_PATH +from utils.http_utils import AsyncHttpx +from models.chat_history import ChatHistory +from configs.config import Config + + +async def pre_precess(msg: List[str], config) -> str: + return await asyncio.get_event_loop().run_in_executor( + None, _pre_precess, msg,config) + + +def _pre_precess(msg: List[str],config) -> str: + """对消息进行预处理""" + # 过滤掉命令 + command_start = tuple([i for i in config.command_start if i]) + msg = " ".join([m for m in msg if not m.startswith(command_start)]) + + # 去除网址 + msg = re.sub(r"https?://[\w/:%#\$&\?\(\)~\.=\+\-]+", "", msg) + + # 去除 \u200b + msg = re.sub(r"[\u200b]", "", msg) + + # 去除cq码 + msg = re.sub(r"\[CQ:.*?]", "", msg) + + # 去除[] + msg = re.sub("[ (1|3);]", "", msg) + + # 去除 emoji + # https://github.com/carpedm20/emoji + msg = replace_emoji(msg) + return msg + + + +async def draw_word_cloud(messages, config): + wordcloud_dir = IMAGE_PATH / "wordcloud" + wordcloud_dir.mkdir(exist_ok=True, parents=True) + # 默认用真寻图片 + zx_logo_path = wordcloud_dir / "default.png" + wordcloud_ttf = FONT_PATH / "STKAITI.TTF" + if not os.listdir(wordcloud_dir): + url = "https://ghproxy.com/https://raw.githubusercontent.com/HibiKier/zhenxun_bot/main/resources/image/wordcloud/default.png" + try: + await AsyncHttpx.download_file(url, zx_logo_path) + except: + return False + if not wordcloud_ttf.exists(): + ttf_url = 'https://ghproxy.com/https://raw.githubusercontent.com/HibiKier/zhenxun_bot/main/resources/font/STKAITI.TTF' + try: + await AsyncHttpx.download_file(ttf_url, wordcloud_ttf) + except: + return False + + topK = min(int(len(messages)), 100000) + read_name = jieba.analyse.extract_tags(await pre_precess(messages, config), topK=topK, + withWeight=True, + allowPOS=()) + name = [] + value = [] + for t in read_name: + name.append(t[0]) + value.append(t[1]) + for i in range(len(name)): + name[i] = str(name[i]) + dic = dict(zip(name, value)) + if Config.get_config("word_clouds", "WORD_CLOUDS_TEMPLATE") == 1: + def random_pic(base_path: str) -> str: + path_dir = os.listdir(base_path) + path = random.sample(path_dir, 1)[0] + return (str(base_path) + "/" + str(path)) + + mask = np.array(IMG.open(random_pic(wordcloud_dir))) + wc = WordCloud( + font_path=f"{wordcloud_ttf}", + background_color="white", + max_font_size=100, + width=1920, + height=1080, + mask=mask, + ) + wc.generate_from_frequencies(dic) + image_colors = ImageColorGenerator(mask, default_color=(255, 255, 255)) + wc.recolor(color_func=image_colors) + plt.imshow(wc.recolor(color_func=image_colors), interpolation="bilinear") + plt.axis("off") + else: + wc = WordCloud( + font_path=str(wordcloud_ttf), + width=1920, + height=1200, + background_color="black", + ) + wc.generate_from_frequencies(dic) + bytes_io = BytesIO() + img = wc.to_image() + img.save(bytes_io, format="PNG") + return bytes_io.getvalue() + + +async def get_list_msg(user_id, group_id, days): + messages_list = await ChatHistory()._get_msg(uid=user_id, gid=group_id, type_="group", days=days).gino.all() + if messages_list: + messages = [i.text for i in messages_list] + return messages + else: + return False diff --git a/resources/font/STKAITI.TTF b/resources/font/STKAITI.TTF new file mode 100644 index 00000000..50441161 Binary files /dev/null and b/resources/font/STKAITI.TTF differ diff --git a/resources/image/wordcloud/default.png b/resources/image/wordcloud/default.png new file mode 100644 index 00000000..7912ce8b Binary files /dev/null and b/resources/image/wordcloud/default.png differ