diff --git a/README.md b/README.md index b1cfed2e..4481cda2 100644 --- a/README.md +++ b/README.md @@ -296,6 +296,12 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能 ## 更新 +### 2022/12/23 + +* 优化`管理员帮助`,`超级用户帮助`图片 +* 重新移植`gamedraw` +* 修复pil帮助私聊时无法生成 + ### 2022/12/17 * 修复查看插件仓库当已安装插件版本不一致时出错 diff --git a/basic_plugins/admin_help/__init__.py b/basic_plugins/admin_help/__init__.py index e2f3ddd5..938aaab2 100755 --- a/basic_plugins/admin_help/__init__.py +++ b/basic_plugins/admin_help/__init__.py @@ -3,7 +3,7 @@ from nonebot.typing import T_State from nonebot.adapters import Bot from nonebot.adapters.onebot.v11 import GroupMessageEvent from utils.message_builder import image -from .data_source import create_help_image, admin_help_image +from .data_source import create_help_image, ADMIN_HELP_IMAGE __zx_plugin_name__ = '管理帮助 [Admin]' @@ -16,12 +16,12 @@ __plugin_settings__ = { admin_help = on_command("管理员帮助", aliases={"管理帮助"}, priority=5, block=True) -if admin_help_image.exists(): - admin_help_image.unlink() +if ADMIN_HELP_IMAGE.exists(): + ADMIN_HELP_IMAGE.unlink() @admin_help.handle() async def _(bot: Bot, event: GroupMessageEvent, state: T_State): - if not admin_help_image.exists(): + if not ADMIN_HELP_IMAGE.exists(): await create_help_image() - await admin_help.send(image('admin_help_img.png')) + await admin_help.send(image(ADMIN_HELP_IMAGE)) diff --git a/basic_plugins/admin_help/data_source.py b/basic_plugins/admin_help/data_source.py index a9fa4f84..28303a90 100755 --- a/basic_plugins/admin_help/data_source.py +++ b/basic_plugins/admin_help/data_source.py @@ -1,25 +1,25 @@ -from utils.image_utils import BuildImage -from configs.path_config import IMAGE_PATH -from services.log import logger -from utils.utils import get_matchers -from utils.manager import group_manager -from nonebot.adapters.onebot.v11 import Bot -from nonebot import Driver import nonebot - +from configs.path_config import IMAGE_PATH +from nonebot import Driver +from services.log import logger +from utils.image_template import help_template +from utils.image_utils import (BuildImage, build_sort_image, group_image, + text2image) +from utils.manager import group_manager, plugin_data_manager +from utils.manager.models import PluginType driver: Driver = nonebot.get_driver() background = IMAGE_PATH / "background" / "0.png" -admin_help_image = IMAGE_PATH / 'admin_help_img.png' +ADMIN_HELP_IMAGE = IMAGE_PATH / "admin_help_img.png" @driver.on_bot_connect async def init_task(): if not group_manager.get_task_data(): group_manager.load_task() - logger.info(f'已成功加载 {len(group_manager.get_task_data())} 个被动技能.') + logger.info(f"已成功加载 {len(group_manager.get_task_data())} 个被动技能.") async def create_help_image(): @@ -33,58 +33,55 @@ async def _create_help_image(): """ 创建管理员帮助图片 """ - _matchers = get_matchers() - _plugin_name_list = [] - width = 0 - _plugin_level = {} - for matcher in _matchers: - _plugin = nonebot.plugin.get_plugin(matcher.plugin_name) - _module = _plugin.module + if ADMIN_HELP_IMAGE.exists(): + return + plugin_data_ = plugin_data_manager.get_data() + image_list = [] + task_list = [] + for plugin_data in [plugin_data_[x] for x in plugin_data_]: try: - plugin_name = _module.__getattribute__("__zx_plugin_name__") - except AttributeError: - continue - try: - if ( - "[admin]" in plugin_name.lower() - and plugin_name not in _plugin_name_list - and plugin_name != "管理帮助 [Admin]" - ): - _plugin_name_list.append(plugin_name) - plugin_settings = _module.__getattribute__("__plugin_settings__") - plugin_des = _module.__getattribute__("__plugin_des__") - plugin_cmd = _module.__getattribute__("__plugin_cmd__") - plugin_cmd = [x for x in plugin_cmd if "[_superuser]" not in x] - admin_level = int(plugin_settings["admin_level"]) - if _plugin_level.get(admin_level): - _plugin_level[admin_level].append( - f"[{admin_level}] {plugin_des} -> " + " / ".join(plugin_cmd) - ) - else: - _plugin_level[admin_level] = [ - f"[{admin_level}] {plugin_des} -> " + " / ".join(plugin_cmd) - ] - x = len(f"[{admin_level}] {plugin_des} -> " + " / ".join(plugin_cmd)) * 23 - width = width if width > x else x - except AttributeError: - logger.warning(f"获取管理插件 {matcher.plugin_name}: {plugin_name} 设置失败...") - help_str = "* 注: ‘*’ 代表可有多个相同参数 ‘?’ 代表可省略该参数 *\n\n" \ - "[权限等级] 管理员帮助:\n\n" - x = list(_plugin_level.keys()) - x.sort() - for level in x: - for help_ in _plugin_level[level]: - help_str += f"\t{help_}\n\n" - help_str += '-----[被动技能开关]-----\n\n' - task_data = group_manager.get_task_data() - for i, x in enumerate(task_data.keys()): - help_str += f'{i+1}.开启/关闭{task_data[x]}\n\n' - height = len(help_str.split("\n")) * 33 - A = BuildImage(width, height, font_size=24) - _background = BuildImage(width, height, background=background) - await A.apaste(_background, alpha=True) - await A.atext((150, 110), help_str) - await A.asave(admin_help_image) - logger.info(f'已成功加载 {len(_plugin_name_list)} 条管理员命令') - - + usage = None + if plugin_data.plugin_type == PluginType.ADMIN and plugin_data.usage: + usage = await text2image( + plugin_data.usage, padding=5, color=(204, 196, 151) + ) + if usage: + await usage.acircle_corner() + level = 5 + if plugin_data.plugin_setting: + level = plugin_data.plugin_setting.level or level + image = await help_template(plugin_data.name + f"[{level}]", usage) + image_list.append(image) + if plugin_data.task: + for x in plugin_data.task.keys(): + task_list.append(plugin_data.task[x]) + except Exception as e: + logger.warning( + f"获取群管理员插件 {plugin_data.model}: {plugin_data.name} 设置失败... {type(e)}:{e}" + ) + task_str = "\n".join(task_list) + task_str = "通过 开启/关闭 来控制群被动\n----------\n" + task_str + task_image = await text2image(task_str, padding=5, color=(204, 196, 151)) + task_image = await help_template("被动任务", task_image) + image_list.append(task_image) + image_group, _ = group_image(image_list) + A = await build_sort_image(image_group, color="#f9f6f2", padding_top=180) + await A.apaste( + BuildImage(0, 0, font="CJGaoDeGuo.otf", plain_text="群管理员帮助", font_size=50), + (50, 30), + True, + ) + await A.apaste( + BuildImage( + 0, + 0, + font="CJGaoDeGuo.otf", + plain_text="注: ‘*’ 代表可有多个相同参数 ‘?’ 代表可省略该参数", + font_size=30, + font_color="red", + ), + (50, 90), + True, + ) + await A.asave(ADMIN_HELP_IMAGE) + logger.info(f"已成功加载 {len(image_list)} 条管理员命令") diff --git a/basic_plugins/help/_data_source.py b/basic_plugins/help/_data_source.py index b13bb321..e53fa80c 100644 --- a/basic_plugins/help/_data_source.py +++ b/basic_plugins/help/_data_source.py @@ -17,17 +17,21 @@ background = IMAGE_PATH / "background" / "0.png" async def create_help_img(group_id: Optional[int]): """ - 生成帮助图片 - :param group_id: 群号 + 说明: + 生成帮助图片 + 参数: + :param group_id: 群号 """ await HelpImageBuild().build_image(group_id) def get_plugin_help(msg: str, is_super: bool = False) -> Optional[str]: """ - 获取功能的帮助信息 - :param msg: 功能cmd - :param is_super: 是否为超级用户 + 说明: + 获取功能的帮助信息 + 参数: + :param msg: 功能cmd + :param is_super: 是否为超级用户 """ module = plugins2settings_manager.get_plugin_module( msg diff --git a/basic_plugins/help/_utils.py b/basic_plugins/help/_utils.py index 1578d1e3..dd8d6093 100644 --- a/basic_plugins/help/_utils.py +++ b/basic_plugins/help/_utils.py @@ -1,17 +1,15 @@ -from typing import List, Tuple, Dict, Optional -from nonebot_plugin_htmlrender import template_to_pic - -from ._config import Item -from configs.path_config import IMAGE_PATH, TEMPLATE_PATH, DATA_PATH -from utils.decorator import Singleton -from utils.image_utils import BuildImage -from configs.config import Config import os import random +from typing import Dict, List, Optional -from utils.manager import plugin_data_manager, group_manager +from configs.config import Config +from configs.path_config import DATA_PATH, IMAGE_PATH, TEMPLATE_PATH +from utils.decorator import Singleton +from utils.image_utils import BuildImage, build_sort_image, group_image +from utils.manager import group_manager, plugin_data_manager from utils.manager.models import PluginData, PluginType +from ._config import Item GROUP_HELP_PATH = DATA_PATH / "group_help" GROUP_HELP_PATH.mkdir(exist_ok=True, parents=True) @@ -21,126 +19,27 @@ for x in os.listdir(GROUP_HELP_PATH): BACKGROUND_PATH = IMAGE_PATH / "background" / "help" / "simple_help" -LOGO_PATH = TEMPLATE_PATH / 'menu' / 'res' / 'logo' - - -async def build_help_image(image_group: List[List[BuildImage]], h: int): - bk = None - random_bk = os.listdir(BACKGROUND_PATH) - if random_bk: - bk = random.choice(random_bk) - A = BuildImage( - h, - h, - font_size=24, - color="#FFEFD5", - background=(BACKGROUND_PATH / bk) if bk else None, - ) - A.filter("GaussianBlur", 5) - curr_w = 50 - for ig in image_group: - curr_h = 180 - for img in ig: - await A.apaste(img, (curr_w, curr_h), True) - curr_h += img.h + 10 - curr_w += max([x.w for x in ig]) + 30 - return A - - -def group_image(image_list: List[BuildImage]) -> Tuple[List[List[BuildImage]], int]: - """ - 说明: - 根据图片大小进行分组 - 参数: - :param image_list: 排序图片列表 - """ - image_list.sort(key=lambda x: x.h, reverse=True) - max_image = max(image_list, key=lambda x: x.h) - - image_list.remove(max_image) - max_h = max_image.h - total_w = 0 - - # 图片分组 - image_group = [[max_image]] - is_use = [] - surplus_list = image_list[:] - - for image in image_list: - if image.uid not in is_use: - group = [image] - is_use.append(image.uid) - curr_h = image.h - while True: - surplus_list = [x for x in surplus_list if x.uid not in is_use] - for tmp in surplus_list: - temp_h = curr_h + tmp.h + 10 - if temp_h < max_h or abs(max_h - temp_h) < 100: - curr_h += tmp.h + 15 - is_use.append(tmp.uid) - group.append(tmp) - break - else: - break - total_w += max([x.w for x in group]) + 15 - image_group.append(group) - while surplus_list: - surplus_list = [x for x in surplus_list if x.uid not in is_use] - if not surplus_list: - break - surplus_list.sort(key=lambda x: x.h, reverse=True) - for img in surplus_list: - if img.uid not in is_use: - _w = 0 - index = -1 - for i, ig in enumerate(image_group): - if s := sum([x.h for x in ig]) > _w: - _w = s - index = i - if index != -1: - image_group[index].append(img) - is_use.append(img.uid) - max_h = 0 - max_w = 0 - for i, ig in enumerate(image_group): - if (_h := sum([x.h + 15 for x in ig])) > max_h: - max_h = _h - max_w += max([x.w for x in ig]) + 30 - is_use.clear() - while abs(max_h - max_w) > 200 and len(image_group) - 1 >= len(image_group[-1]): - for img in image_group[-1]: - _min_h = 0 - _min_index = -1 - for i, ig in enumerate(image_group): - if i not in is_use and (_h := sum([x.h for x in ig]) + img.h) > _min_h: - _min_h = _h - _min_index = i - is_use.append(_min_index) - image_group[_min_index].append(img) - max_w -= max([x.w for x in image_group[-1]]) - image_group.pop(-1) - return image_group, max(max_h + 250, max_w + 70) +LOGO_PATH = TEMPLATE_PATH / "menu" / "res" / "logo" @Singleton class HelpImageBuild: - def __init__(self): self._data: Dict[str, PluginData] = plugin_data_manager.get_data() self._sort_data: Dict[str, List[PluginData]] = {} self._image_list = [] self.icon2str = { - 'normal': 'fa fa-cog', - '原神相关': 'fa fa-circle-o', - '常规插件': 'fa fa-cubes', - '联系管理员': 'fa fa-envelope-o', - '抽卡相关': 'fa fa-credit-card-alt', - '来点好康的': 'fa fa-picture-o', - '数据统计': 'fa fa-bar-chart', - '一些工具': 'fa fa-shopping-cart', - '商店': 'fa fa-shopping-cart', - '其它': 'fa fa-tags', - '群内小游戏': 'fa fa-gamepad', + "normal": "fa fa-cog", + "原神相关": "fa fa-circle-o", + "常规插件": "fa fa-cubes", + "联系管理员": "fa fa-envelope-o", + "抽卡相关": "fa fa-credit-card-alt", + "来点好康的": "fa fa-picture-o", + "数据统计": "fa fa-bar-chart", + "一些工具": "fa fa-shopping-cart", + "商店": "fa fa-shopping-cart", + "其它": "fa fa-tags", + "群内小游戏": "fa fa-gamepad", } def sort_type(self): @@ -162,28 +61,36 @@ class HelpImageBuild: else: help_image = IMAGE_PATH / f"simple_help.png" build_type = Config.get_config("help", "TYPE") - if build_type == 'HTML': + if build_type == "HTML": byt = await self.build_html_image(group_id) - with open(help_image, 'wb') as f: + with open(help_image, "wb") as f: f.write(byt) else: img = await self.build_pil_image(group_id) img.save(help_image) async def build_html_image(self, group_id: Optional[int]) -> bytes: + from nonebot_plugin_htmlrender import template_to_pic self.sort_type() classify = {} for menu in self._sort_data: for plugin in self._sort_data[menu]: sta = 0 if not plugin.plugin_status.status: - if group_id and plugin.plugin_status.block_type in ['all', 'group']: + if group_id and plugin.plugin_status.block_type in ["all", "group"]: sta = 2 - if not group_id and plugin.plugin_status.block_type in ['all', 'private']: + if not group_id and plugin.plugin_status.block_type in [ + "all", + "private", + ]: sta = 2 - if group_id and not group_manager.get_plugin_super_status(plugin.model, group_id): + if group_id and not group_manager.get_plugin_super_status( + plugin.model, group_id + ): sta = 2 - if group_id and not group_manager.get_plugin_status(plugin.model, group_id): + if group_id and not group_manager.get_plugin_status( + plugin.model, group_id + ): sta = 1 if classify.get(menu): classify[menu].append(Item(plugin_name=plugin.name, sta=sta)) @@ -197,11 +104,14 @@ class HelpImageBuild: if plu in self.icon2str.keys(): icon = self.icon2str[plu] else: - icon = 'fa fa-pencil-square-o' + icon = "fa fa-pencil-square-o" logo = LOGO_PATH / random.choice(os.listdir(LOGO_PATH)) - # print(str(logo.absolute())) - data = {'name': plu if plu != 'normal' else '功能', 'items': classify[plu], 'icon': icon, - 'logo': str(logo.absolute())} + data = { + "name": plu if plu != "normal" else "功能", + "items": classify[plu], + "icon": icon, + "logo": str(logo.absolute()), + } if len(classify[plu]) > max_len: max_len = len(classify[plu]) flag_index = index @@ -210,8 +120,8 @@ class HelpImageBuild: del plugin_list[flag_index] plugin_list.insert(0, max_data) pic = await template_to_pic( - template_path=str((TEMPLATE_PATH / 'menu').absolute()), - template_name='zhenxun_menu.html', + template_path=str((TEMPLATE_PATH / "menu").absolute()), + template_name="zhenxun_menu.html", templates={"plugin_list": plugin_list}, pages={ "viewport": {"width": 1903, "height": 975}, @@ -270,7 +180,9 @@ class HelpImageBuild: if ( not plugin_data.plugin_status.status and plugin_data.plugin_status.block_type in ["group", "all"] - ) or not group_manager.get_plugin_super_status(plugin_data.model, group_id): + ) or (group_id and not group_manager.get_plugin_super_status( + plugin_data.model, group_id + )): w = curr_h + int(B.getsize(plugin_data.name)[1] / 2) + 2 pos = ( 7, @@ -305,7 +217,12 @@ class HelpImageBuild: # await bk.acircle_corner(point_list=['lt', 'rt']) self._image_list.append(bk) image_group, h = group_image(self._image_list) - B = await build_help_image(image_group, h) + B = await build_sort_image( + image_group, + h, + background_path=BACKGROUND_PATH, + background_handle=lambda image: image.filter("GaussianBlur", 5), + ) w = 10 h = 10 for msg in [ diff --git a/basic_plugins/init_plugin_config/init_plugin_info.py b/basic_plugins/init_plugin_config/init_plugin_info.py index 043cebdc..bde3f5e1 100644 --- a/basic_plugins/init_plugin_config/init_plugin_info.py +++ b/basic_plugins/init_plugin_config/init_plugin_info.py @@ -2,23 +2,13 @@ import random from types import ModuleType from typing import Any -from services import logger -from utils.manager import ( - plugin_data_manager, - plugins2settings_manager, - plugins2cd_manager, - plugins2block_manager, - plugins2count_manager, plugins_manager, -) from configs.config import Config -from utils.manager.models import ( - PluginType, - PluginSetting, - PluginCd, - PluginData, - PluginBlock, - PluginCount, Plugin, -) +from services import logger +from utils.manager import (plugin_data_manager, plugins2block_manager, + plugins2cd_manager, plugins2count_manager, + plugins2settings_manager, plugins_manager) +from utils.manager.models import (Plugin, PluginBlock, PluginCd, PluginCount, + PluginData, PluginSetting, PluginType) from utils.utils import get_matchers @@ -31,9 +21,7 @@ def get_attr(module: ModuleType, name: str, default: Any = None) -> Any: :param name: name :param default: default """ - if hasattr(module, name): - return getattr(module, name) - return default + return getattr(module, name) if hasattr(module, name) else default def init_plugin_info(): @@ -42,6 +30,7 @@ def init_plugin_info(): try: plugin = matcher.plugin metadata = plugin.metadata + extra = metadata.extra if metadata else {} if hasattr(plugin, "module"): module = plugin.module plugin_model = matcher.plugin_name @@ -52,13 +41,13 @@ def init_plugin_info(): ) if "[Admin]" in plugin_name: plugin_type = PluginType.ADMIN - plugin_name = plugin_name.replace("[Admin]", "") + plugin_name = plugin_name.replace("[Admin]", "").strip() elif "[Hidden]" in plugin_name: plugin_type = PluginType.HIDDEN - plugin_name = plugin_name.replace("[Hidden]", "") + plugin_name = plugin_name.replace("[Hidden]", "").strip() elif "[Superuser]" in plugin_name: plugin_type = PluginType.SUPERUSER - plugin_name = plugin_name.replace("[Superuser]", "") + plugin_name = plugin_name.replace("[Superuser]", "").strip() else: plugin_type = PluginType.NORMAL plugin_usage = ( @@ -76,9 +65,14 @@ def init_plugin_info(): if plugin_setting: plugin_setting = PluginSetting(**plugin_setting) plugin_setting.plugin_type = menu_type + plugin_superuser_usage = get_attr(module, "__plugin_super_usage__") plugin_task = get_attr(module, "__plugin_task__") - plugin_version = get_attr(module, "__plugin_version__") - plugin_author = get_attr(module, "__plugin_author__") + plugin_version = extra.get("__plugin_version__") or get_attr( + module, "__plugin_version__" + ) + plugin_author = extra.get("__plugin_author__") or get_attr( + module, "__plugin_author__" + ) plugin_cd = get_attr(module, "__plugin_cd_limit__") if plugin_cd: plugin_cd = PluginCd(**plugin_cd) @@ -102,9 +96,7 @@ def init_plugin_info(): plugin_configs = plugin_cfg plugin_status = plugins_manager.get(plugin_model) if not plugin_status: - plugin_status = Plugin( - plugin_name=plugin_model - ) + plugin_status = Plugin(plugin_name=plugin_model) plugin_status.author = plugin_author plugin_status.version = plugin_version plugin_data = PluginData( @@ -112,6 +104,7 @@ def init_plugin_info(): name=plugin_name.strip(), plugin_type=plugin_type, usage=plugin_usage, + superuser_usage=plugin_superuser_usage, des=plugin_des, task=plugin_task, menu_type=menu_type, @@ -121,7 +114,7 @@ def init_plugin_info(): plugin_count=plugin_count, plugin_resources=plugin_resources, plugin_configs=plugin_configs, - plugin_status=plugin_status + plugin_status=plugin_status, ) plugin_data_manager.add_plugin_info(plugin_data) except Exception as e: diff --git a/basic_plugins/super_help/__init__.py b/basic_plugins/super_help/__init__.py index 6dbe286f..b6c29cee 100755 --- a/basic_plugins/super_help/__init__.py +++ b/basic_plugins/super_help/__init__.py @@ -1,18 +1,15 @@ from nonebot import on_command from nonebot.permission import SUPERUSER from nonebot.rule import to_me -from configs.path_config import IMAGE_PATH from utils.message_builder import image -from .data_source import create_help_image +from .data_source import create_help_image, SUPERUSER_HELP_IMAGE __zx_plugin_name__ = '超级用户帮助 [Superuser]' -superuser_help_image = IMAGE_PATH / 'superuser_help.png' - -if superuser_help_image.exists(): - superuser_help_image.unlink() +if SUPERUSER_HELP_IMAGE.exists(): + SUPERUSER_HELP_IMAGE.unlink() super_help = on_command( "超级用户帮助", rule=to_me(), priority=1, permission=SUPERUSER, block=True @@ -21,7 +18,7 @@ super_help = on_command( @super_help.handle() async def _(): - if not superuser_help_image.exists(): + if not SUPERUSER_HELP_IMAGE.exists(): await create_help_image() - x = image(superuser_help_image) + x = image(SUPERUSER_HELP_IMAGE) await super_help.finish(x) diff --git a/basic_plugins/super_help/data_source.py b/basic_plugins/super_help/data_source.py index e7e559a5..3aa47451 100755 --- a/basic_plugins/super_help/data_source.py +++ b/basic_plugins/super_help/data_source.py @@ -1,79 +1,84 @@ -from utils.image_utils import BuildImage -from configs.path_config import IMAGE_PATH -from services.log import logger -from utils.utils import get_matchers -from nonebot.adapters.onebot.v11 import Bot -from nonebot import Driver -import asyncio -import nonebot +import nonebot +from configs.path_config import IMAGE_PATH +from nonebot import Driver +from services.log import logger +from utils.image_template import help_template +from utils.image_utils import (BuildImage, build_sort_image, group_image, + text2image) +from utils.manager import plugin_data_manager +from utils.manager.models import PluginType driver: Driver = nonebot.get_driver() -background = IMAGE_PATH / "background" / "0.png" - -superuser_help_image = IMAGE_PATH / "superuser_help.png" +SUPERUSER_HELP_IMAGE = IMAGE_PATH / "superuser_help.png" @driver.on_bot_connect -async def create_help_image(bot: Bot = None): +async def create_help_image(): """ 创建超级用户帮助图片 """ - await asyncio.get_event_loop().run_in_executor(None, _create_help_image) - - -def _create_help_image(): - """ - 创建管理员帮助图片 - """ - _matchers = get_matchers() - _plugin_name_list = [] - width = 0 - help_str = "超级用户帮助\n\n* 注: ‘*’ 代表可有多个相同参数 ‘?’ 代表可省略该参数 *\n\n" - tmp_img = BuildImage(0, 0, plain_text='1', font_size=24) - for matcher in _matchers: - plugin_name = "" + if SUPERUSER_HELP_IMAGE.exists(): + return + plugin_data_ = plugin_data_manager.get_data() + image_list = [] + task_list = [] + for plugin_data in [ + plugin_data_[x] + for x in plugin_data_ + if plugin_data_[x].name != "超级用户帮助 [Superuser]" + ]: try: - _plugin = nonebot.plugin.get_plugin(matcher.plugin_name) - _module = _plugin.module - try: - plugin_name = _module.__getattribute__("__zx_plugin_name__") - except AttributeError: - continue - is_superuser_usage = False - try: - _ = _module.__getattribute__("__plugin_superuser_usage__") - is_superuser_usage = True - except AttributeError: - pass - if ( - ("[superuser]" in plugin_name.lower() or is_superuser_usage) - and plugin_name != "超级用户帮助 [Superuser]" - and plugin_name not in _plugin_name_list - and "[hidden]" not in plugin_name.lower() - ): - _plugin_name_list.append(plugin_name) - try: - plugin_des = _module.__getattribute__("__plugin_des__") - except AttributeError: - plugin_des = '_' - plugin_cmd = _module.__getattribute__("__plugin_cmd__") - if is_superuser_usage: - plugin_cmd = [x for x in plugin_cmd if "[_superuser]" in x] - plugin_cmd = " / ".join(plugin_cmd).replace('[_superuser]', '').strip() - help_str += f"{plugin_des} -> {plugin_cmd}\n\n" - x = tmp_img.getsize(f"{plugin_des} -> {plugin_cmd}")[0] - width = width if width > x else x + if plugin_data.plugin_type in [PluginType.SUPERUSER, PluginType.ADMIN]: + usage = None + if ( + plugin_data.plugin_type == PluginType.SUPERUSER + and plugin_data.usage + ): + usage = await text2image( + plugin_data.usage, padding=5, color=(204, 196, 151) + ) + if plugin_data.superuser_usage: + usage = await text2image( + plugin_data.superuser_usage, padding=5, color=(204, 196, 151) + ) + if usage: + await usage.acircle_corner() + image = await help_template(plugin_data.name, usage) + image_list.append(image) + if plugin_data.task: + for x in plugin_data.task.keys(): + task_list.append(plugin_data.task[x]) except Exception as e: logger.warning( - f"获取超级用户插件 {matcher.plugin_name}: {plugin_name} 设置失败... {type(e)}:{e}" + f"获取超级用户插件 {plugin_data.model}: {plugin_data.name} 设置失败... {type(e)}:{e}" ) - height = len(help_str.split("\n")) * 33 - width += 500 - A = BuildImage(width, height, font_size=24) - _background = BuildImage(width, height, background=background) - A.text((300, 140), help_str) - A.paste(_background, alpha=True) - A.save(superuser_help_image) - logger.info(f"已成功加载 {len(_plugin_name_list)} 条超级用户命令") + task_str = "\n".join(task_list) + task_str = "通过 开启/关闭 来控制全局被动\n----------\n" + task_str + task_image = await text2image( + task_str, padding=5, color=(204, 196, 151) + ) + task_image = await help_template("被动任务", task_image) + image_list.append(task_image) + image_group, _ = group_image(image_list) + A = await build_sort_image(image_group, color="#f9f6f2", padding_top=180) + await A.apaste( + BuildImage(0, 0, font="CJGaoDeGuo.otf", plain_text="超级用户帮助", font_size=50), + (50, 30), + True, + ) + await A.apaste( + BuildImage( + 0, + 0, + font="CJGaoDeGuo.otf", + plain_text="注: ‘*’ 代表可有多个相同参数 ‘?’ 代表可省略该参数", + font_size=30, + font_color="red", + ), + (50, 90), + True, + ) + await A.asave(SUPERUSER_HELP_IMAGE) + logger.info(f"已成功加载 {len(image_list)} 条超级用户命令") diff --git a/plugins/draw_card/__init__.py b/plugins/draw_card/__init__.py index 73b4a84c..633f5ccf 100644 --- a/plugins/draw_card/__init__.py +++ b/plugins/draw_card/__init__.py @@ -1,11 +1,12 @@ import asyncio import traceback -from cn2an import cn2an from dataclasses import dataclass -from typing import Optional, Set, Tuple +from typing import Optional, Set, Tuple, Any import nonebot -from nonebot import on_regex, on_keyword +from cn2an import cn2an +from configs.config import Config +from nonebot import on_keyword, on_regex from nonebot.adapters.onebot.v11 import MessageEvent from nonebot.log import logger from nonebot.matcher import Matcher @@ -14,8 +15,9 @@ from nonebot.permission import SUPERUSER from nonebot.typing import T_Handler from nonebot_plugin_apscheduler import scheduler -from .handles.base_handle import BaseHandle from .handles.azur_handle import AzurHandle +from .handles.ba_handle import BaHandle +from .handles.base_handle import BaseHandle from .handles.fgo_handle import FgoHandle from .handles.genshin_handle import GenshinHandle from .handles.guardian_handle import GuardianHandle @@ -23,10 +25,6 @@ from .handles.onmyoji_handle import OnmyojiHandle from .handles.pcr_handle import PcrHandle from .handles.pretty_handle import PrettyHandle from .handles.prts_handle import PrtsHandle -from .handles.ba_handle import BaHandle - -from .config import draw_config - __zx_plugin_name__ = "游戏抽卡" __plugin_usage__ = """ @@ -35,6 +33,7 @@ usage: 指令: 原神[1-180]抽: 原神常驻池 原神角色[1-180]抽: 原神角色UP池子 + 原神角色2池[1-180]抽: 原神角色UP池子 原神武器[1-180]抽: 原神武器UP池子 重置原神抽卡: 清空当前卡池的抽卡次数[即从0开始计算UP概率] 方舟[1-300]抽: 方舟卡池,当有当期UP时指向UP池 @@ -106,48 +105,18 @@ class Game: flag: bool max_count: int = 300 # 一次最大抽卡数 reload_time: Optional[int] = None # 重载UP池时间(小时) + has_other_pool: bool = False -games = ( - Game({"azur", "碧蓝航线"}, AzurHandle(), draw_config.AZUR_FLAG), - Game({"fgo", "命运冠位指定"}, FgoHandle(), draw_config.FGO_FLAG), - Game( - {"genshin", "原神"}, - GenshinHandle(), - draw_config.GENSHIN_FLAG, - max_count=180, - reload_time=18, - ), - Game( - {"guardian", "坎公骑冠剑"}, - GuardianHandle(), - draw_config.GUARDIAN_FLAG, - reload_time=4, - ), - Game({"onmyoji", "阴阳师"}, OnmyojiHandle(), draw_config.ONMYOJI_FLAG), - Game({"pcr", "公主连结", "公主连接", "公主链接", "公主焊接"}, PcrHandle(), draw_config.PCR_FLAG), - Game( - {"pretty", "马娘", "赛马娘"}, - PrettyHandle(), - draw_config.PRETTY_FLAG, - max_count=200, - reload_time=4, - ), - Game({"prts", "方舟", "明日方舟"}, PrtsHandle(), draw_config.PRTS_FLAG, reload_time=4), - Game( - {"ba","碧蓝档案"},BaHandle(), - draw_config.BA_FLAG, - max_count=200, - ), -) +games = None def create_matchers(): def draw_handler(game: Game) -> T_Handler: async def handler( - matcher: Matcher, event: MessageEvent, args: Tuple[str, ...] = RegexGroup() + matcher: Matcher, event: MessageEvent, args: Tuple[Any, ...] = RegexGroup() ): - pool_name, num, unit = args + pool_name, pool_type_, num, unit = args if num == "单": num = 1 else: @@ -168,8 +137,11 @@ def create_matchers(): .replace("卡牌", "card") .replace("卡", "card") ) + try: - res = await game.handle.draw(num, pool_name=pool_name, user_id=event.user_id) + if pool_type_ in ["2池", "二池"]: + pool_name = pool_name + "1" + res = game.handle.draw(num, pool_name=pool_name, user_id=event.user_id) except: logger.warning(traceback.format_exc()) await matcher.finish("出错了...") @@ -209,8 +181,11 @@ def create_matchers(): pool_pattern = r"([^\s单0-9零一二三四五六七八九百十]{0,3})" num_pattern = r"(单|[0-9零一二三四五六七八九百十]{1,3})" unit_pattern = r"([抽|井|连])" - draw_regex = r".*?(?:{})\s*{}\s*{}\s*{}".format( - "|".join(game.keywords), pool_pattern, num_pattern, unit_pattern + pool_type = "()" + if game.has_other_pool: + pool_type = r"([2二]池)?" + draw_regex = r".*?(?:{})\s*{}\s*{}\s*{}\s*{}".format( + "|".join(game.keywords), pool_pattern, pool_type, num_pattern, unit_pattern ) update_keywords = {f"更新{keyword}信息" for keyword in game.keywords} reload_keywords = {f"重载{keyword}卡池" for keyword in game.keywords} @@ -220,12 +195,12 @@ def create_matchers(): draw_handler(game) ) on_keyword( - update_keywords, permission=SUPERUSER, priority=1, block=True + update_keywords, priority=1, block=True, permission=SUPERUSER ).append_handler(update_handler(game)) - on_keyword(reload_keywords, priority=1, block=True).append_handler( - reload_handler(game) - ) - on_keyword(reset_keywords, priority=1, block=True).append_handler( + on_keyword( + reload_keywords, priority=1, block=True, permission=SUPERUSER + ).append_handler(reload_handler(game)) + on_keyword(reset_keywords, priority=5, block=True).append_handler( reset_handler(game) ) if game.reload_time: @@ -234,7 +209,7 @@ def create_matchers(): ) -create_matchers() +# create_matchers() # 更新资源 @@ -256,6 +231,65 @@ driver = nonebot.get_driver() @driver.on_startup async def _(): + global games + if not games: + from .config import draw_config + + games = ( + Game( + {"azur", "碧蓝航线"}, + AzurHandle(), + Config.get_config("draw_card", "AZUR_FLAG", True), + ), + Game( + {"fgo", "命运冠位指定"}, + FgoHandle(), + Config.get_config("draw_card", "FGO_FLAG", True), + ), + Game( + {"genshin", "原神"}, + GenshinHandle(), + Config.get_config("draw_card", "GENSHIN_FLAG", True), + max_count=180, + reload_time=18, + has_other_pool=True, + ), + Game( + {"guardian", "坎公骑冠剑"}, + GuardianHandle(), + Config.get_config("draw_card", "GUARDIAN_FLAG", True), + reload_time=4, + ), + Game( + {"onmyoji", "阴阳师"}, + OnmyojiHandle(), + Config.get_config("draw_card", "ONMYOJI_FLAG", True), + ), + Game( + {"pcr", "公主连结", "公主连接", "公主链接", "公主焊接"}, + PcrHandle(), + Config.get_config("draw_card", "PCR_FLAG", True), + ), + Game( + {"pretty", "马娘", "赛马娘"}, + PrettyHandle(), + Config.get_config("draw_card", "PRETTY_FLAG", True), + max_count=200, + reload_time=4, + ), + Game( + {"prts", "方舟", "明日方舟"}, + PrtsHandle(), + Config.get_config("draw_card", "PRTS_FLAG", True), + reload_time=4, + ), + Game( + {"ba", "碧蓝档案"}, + BaHandle(), + Config.get_config("draw_card", "BA_FLAG", True), + ), + ) + create_matchers() tasks = [] for game in games: if game.flag: diff --git a/plugins/draw_card/config.py b/plugins/draw_card/config.py index 742cc515..c166c591 100644 --- a/plugins/draw_card/config.py +++ b/plugins/draw_card/config.py @@ -1,9 +1,9 @@ import nonebot from pathlib import Path from nonebot.log import logger -from pydantic import BaseModel, Extra +from pydantic import BaseModel, Extra, ValidationError from configs.config import Config as AConfig -from configs.path_config import DATA_PATH +from configs.path_config import IMAGE_PATH, DATA_PATH try: import ujson as json @@ -87,28 +87,30 @@ class OnmyojiConfig(BaseModel, extra=Extra.ignore): ONMYOJI_SR: float = 0.2 ONMYOJI_R: float = 0.7875 -#碧蓝档案 + +# 碧蓝档案 class BaConfig(BaseModel, extra=Extra.ignore): BA_THREE_P: float = 0.025 BA_TWO_P: float = 0.185 BA_ONE_P: float = 0.79 BA_G_TWO_P: float = 0.975 + class Config(BaseModel, extra=Extra.ignore): # 开关 - PRTS_FLAG: bool = AConfig.get_config("draw_card", "PRTS_FLAG") - GENSHIN_FLAG: bool = AConfig.get_config("draw_card", "GENSHIN_FLAG") - PRETTY_FLAG: bool = AConfig.get_config("draw_card", "PRETTY_FLAG") - GUARDIAN_FLAG: bool = AConfig.get_config("draw_card", "GUARDIAN_FLAG") - PCR_FLAG: bool = AConfig.get_config("draw_card", "PCR_FLAG") - AZUR_FLAG: bool = AConfig.get_config("draw_card", "AZUR_FLAG") - FGO_FLAG: bool = AConfig.get_config("draw_card", "FGO_FLAG") - ONMYOJI_FLAG: bool = AConfig.get_config("draw_card", "ONMYOJI_FLAG") - BA_FLAG: bool = AConfig.get_config("draw_card", "BA_FLAG") + PRTS_FLAG: bool = True + GENSHIN_FLAG: bool = True + PRETTY_FLAG: bool = True + GUARDIAN_FLAG: bool = True + PCR_FLAG: bool = True + AZUR_FLAG: bool = True + FGO_FLAG: bool = True + ONMYOJI_FLAG: bool = True + BA_FLAG: bool = True # 其他配置 - PCR_TAI: bool = AConfig.get_config("draw_card", "PCR_TAI") - SEMAPHORE: int = AConfig.get_config("draw_card", "SEMAPHORE") + PCR_TAI: bool = True + SEMAPHORE: int = 5 # 抽卡概率 prts: PrtsConfig = PrtsConfig() @@ -124,12 +126,16 @@ class Config(BaseModel, extra=Extra.ignore): driver = nonebot.get_driver() -DRAW_PATH = DATA_PATH / "draw_card" -config_path = DRAW_PATH / "draw_card_config" / "draw_card_config.json" +# DRAW_PATH = Path("data/draw_card").absolute() +DRAW_PATH = IMAGE_PATH / "draw_card" +# try: +# DRAW_PATH = Path(global_config.draw_path).absolute() +# except: +# pass +config_path = DATA_PATH / "draw_card" / "draw_card_config" / "draw_card_config.json" draw_config: Config = Config() - for game_flag, game_name in zip( [ "PRTS_FLAG", @@ -143,7 +149,7 @@ for game_flag, game_name in zip( "PCR_TAI", "BA_FLAG" ], - ["明日方舟", "原神", "赛马娘", "坎公骑冠剑", "公主连结", "碧蓝航线", "命运-冠位指定(FGO)", "阴阳师", "pcr台服卡池","碧蓝档案"], + ["明日方舟", "原神", "赛马娘", "坎公骑冠剑", "公主连结", "碧蓝航线", "命运-冠位指定(FGO)", "阴阳师", "pcr台服卡池", "碧蓝档案"], ): AConfig.add_plugin_config( "draw_card", @@ -161,11 +167,19 @@ AConfig.add_plugin_config( @driver.on_startup def check_config(): global draw_config - draw_config = Config() + if not config_path.exists(): config_path.parent.mkdir(parents=True, exist_ok=True) draw_config = Config() logger.warning("draw_card:配置文件不存在,已重新生成配置文件.....") + else: + with config_path.open("r", encoding="utf8") as fp: + data = json.load(fp) + try: + draw_config = Config.parse_obj({**data}) + except ValidationError: + draw_config = Config() + logger.warning("draw_card:配置文件格式错误,已重新生成配置文件.....") with config_path.open("w", encoding="utf8") as fp: json.dump( diff --git a/plugins/draw_card/count_manager.py b/plugins/draw_card/count_manager.py index 9a930879..7768b057 100644 --- a/plugins/draw_card/count_manager.py +++ b/plugins/draw_card/count_manager.py @@ -1,15 +1,16 @@ -from typing import Optional +from typing import Optional, TypeVar, Generic from pydantic import BaseModel -import time +from cachetools import TTLCache class BaseUserCount(BaseModel): - count: int = 1 # 当前抽卡次数 - time_: int = time.time() # 抽卡时间,当超过一定时间时将重置抽卡次数 - timeout: int = 60 # 超时时间60秒 + count: int = 0 # 当前抽卡次数 -class DrawCountManager: +TCount = TypeVar("TCount", bound="BaseUserCount") + + +class DrawCountManager(Generic[TCount]): """ 抽卡统计保底 """ @@ -30,19 +31,28 @@ class DrawCountManager: """ # 只有保底 - self._data = {} + # 超过60秒重置抽卡次数 + self._data: TTLCache[int, TCount] = TTLCache(maxsize=1000, ttl=60) self._guarantee_tuple = game_draw_count_rule self._star2name = star2name self._max_draw_count = max_draw_count + @classmethod + def get_count_class(cls) -> TCount: + raise NotImplementedError + + def _get_count(self, key: int) -> TCount: + if self._data.get(key) is None: + self._data[key] = self.get_count_class() + else: + self._data[key] = self._data[key] + return self._data[key] + def increase(self, key: int, value: int = 1): """ 用户抽卡次数加1 """ - if self._data.get(key) is None: - self._data[key] = BaseUserCount() - else: - self._data[key].count += 1 + self._get_count(key).count += value def get_max_guarantee(self): """ @@ -54,103 +64,74 @@ class DrawCountManager: """ 获取当前抽卡次数 """ - return self._data[key].count - - def update_time(self, key: int): - """ - 更新抽卡时间 - """ - self._data[key].time_ = time.time() + return self._get_count(key).count def reset(self, key: int): """ 清空记录 """ - del self._data[key] + self._data.pop(key, None) -class GenshinCountManager(DrawCountManager): - class UserCount(BaseUserCount): - five_index: int = 0 # 获取五星时的抽卡次数 - four_index: int = 0 # 获取四星时的抽卡次数 - is_up: bool = False # 下次五星是否必定为up +class GenshinUserCount(BaseUserCount): + five_index: int = 0 # 获取五星时的抽卡次数 + four_index: int = 0 # 获取四星时的抽卡次数 + is_up: bool = False # 下次五星是否必定为up - def increase(self, key: int, value: int = 1): - """ - 用户抽卡次数加1 - """ - if self._data.get(key) is None: - self._data[key] = self.UserCount() - else: - self._data[key].count += 1 + +class GenshinCountManager(DrawCountManager[GenshinUserCount]): + @classmethod + def get_count_class(cls) -> GenshinUserCount: + return GenshinUserCount() def set_is_up(self, key: int, value: bool): """ 设置下次是否必定up """ - self._data[key].is_up = value + self._get_count(key).is_up = value def is_up(self, key: int) -> bool: """ 判断该次保底是否必定为up """ - return self._data[key].is_up + return self._get_count(key).is_up def get_user_five_index(self, key: int) -> int: """ 获取用户上次获取五星的次数 """ - return self._data[key].five_index + return self._get_count(key).five_index def get_user_four_index(self, key: int) -> int: """ 获取用户上次获取四星的次数 """ - return self._data[key].four_index + return self._get_count(key).four_index def mark_five_index(self, key: int): """ 标记用户该次次数为五星 """ - self._data[key].five_index = self._data[key].count + self._get_count(key).five_index = self._get_count(key).count def mark_four_index(self, key: int): """ 标记用户该次次数为四星 """ - self._data[key].four_index = self._data[key].count - - def check_timeout(self, key: int): - """ - 检查用户距离上次抽卡是否超时 - """ - if key in self._data.keys() and self._is_timeout(key): - del self._data[key] + self._get_count(key).four_index = self._get_count(key).count def check_count(self, key: int, count: int): """ 检查用户该次抽卡次数累计是否超过最大限制次数 """ - if ( - key in self._data.keys() - and self._data[key].count + count > self._max_draw_count - ): - del self._data[key] - - def _is_timeout(self, key: int) -> bool: - return time.time() - self._data[key].time_ > self._data[key].timeout + if self._get_count(key).count + count > self._max_draw_count: + self._data.pop(key, None) def get_user_guarantee_count(self, key: int) -> int: + user = self._get_count(key) return ( self.get_max_guarantee() - - ( - ( - self._data[key].count % self.get_max_guarantee() - if self._data[key].count > 0 - else 0 - ) - - self._data[key].five_index - ) + - (user.count % self.get_max_guarantee() - user.five_index) ) % self.get_max_guarantee() or self.get_max_guarantee() def check(self, key: int) -> Optional[int]: @@ -158,12 +139,11 @@ class GenshinCountManager(DrawCountManager): 是否保底 """ # print(self._data) - user: GenshinCountManager.UserCount = self._data[key] + user = self._get_count(key) if user.count - user.five_index == 90: - user.five_index = 90 + user.five_index = user.count return 5 if user.count - user.four_index == 10: user.four_index = user.count return 4 return None - diff --git a/plugins/draw_card/handles/azur_handle.py b/plugins/draw_card/handles/azur_handle.py index dd5ae52e..0ae2dade 100644 --- a/plugins/draw_card/handles/azur_handle.py +++ b/plugins/draw_card/handles/azur_handle.py @@ -1,17 +1,17 @@ -import contextlib import random import dateparser from lxml import etree from typing import List, Optional, Tuple +from PIL import ImageDraw from urllib.parse import unquote from pydantic import ValidationError from nonebot.log import logger -from nonebot.adapters.onebot.v11 import Message -from utils.message_builder import image +from nonebot.adapters.onebot.v11 import Message, MessageSegment +from utils.message_builder import image from .base_handle import BaseHandle, BaseData, UpEvent as _UpEvent, UpChar as _UpChar from ..config import draw_config -from ..util import remove_prohibited_str, cn2py +from ..util import remove_prohibited_str, cn2py, load_font from utils.image_utils import BuildImage try: @@ -59,19 +59,25 @@ class AzurHandle(BaseHandle[AzurChar]): # print(up_ship) acquire_char = None if up_ship and up_pool_flag: - up_zoom = [(0, up_ship[0].zoom / 100)] + up_zoom: List[Tuple[float, float]] = [(0, up_ship[0].zoom / 100)] # 初始化概率 cur_ = up_ship[0].zoom / 100 for i in range(len(up_ship)): - with contextlib.suppress(IndexError): + try: up_zoom.append((cur_, cur_ + up_ship[i + 1].zoom / 100)) cur_ += up_ship[i + 1].zoom / 100 + except IndexError: + pass rand = random.random() # 抽取up for i, zoom in enumerate(up_zoom): if zoom[0] <= rand <= zoom[1]: - with contextlib.suppress(IndexError): - acquire_char = [x for x in self.ALL_CHAR if x.name == up_ship[i].name][0] + try: + acquire_char = [ + x for x in self.ALL_CHAR if x.name == up_ship[i].name + ][0] + except IndexError: + pass # 没有up或者未抽取到up if not acquire_char: star = self.get_star( @@ -83,17 +89,19 @@ class AzurHandle(BaseHandle[AzurChar]): self.config.AZUR_ONE_P, ], ) - acquire_char = random.choice([ - x - for x in self.ALL_CHAR - if x.star == star and x.type_ in type_ and not x.limited - ]) + acquire_char = random.choice( + [ + x + for x in self.ALL_CHAR + if x.star == star and x.type_ in type_ and not x.limited + ] + ) return acquire_char - async def draw(self, count: int, **kwargs) -> Message: + def draw(self, count: int, **kwargs) -> Message: index2card = self.get_cards(count, **kwargs) cards = [card[0] for card in index2card] - up_list = [x.name for x in self.UP_EVENT.up_char] if self.UP_EVENT.up_char else [] + up_list = [x.name for x in self.UP_EVENT.up_char] if self.UP_EVENT else [] result = self.format_result(index2card, **{**kwargs, "up_list": up_list}) return image(b64=self.generate_img(cards).pic2bs4()) + result @@ -103,22 +111,25 @@ class AzurHandle(BaseHandle[AzurChar]): sep_b = 20 w = 100 h = 100 - bg = BuildImage(w + sep_w * 2, h + sep_t + sep_b, font="msyh.ttf") - frame_path = self.img_path / f"{card.star}_star.png" + bg = BuildImage(w + sep_w * 2, h + sep_t + sep_b) + frame_path = str(self.img_path / f"{card.star}_star.png") frame = BuildImage(w, h, background=frame_path) - img_path = self.img_path / f"{cn2py(card.name)}.png" + img_path = str(self.img_path / f"{cn2py(card.name)}.png") img = BuildImage(w, h, background=img_path) # 加圆角 + frame.circle_corner(6) img.circle_corner(6) bg.paste(img, (sep_w, sep_t), alpha=True) bg.paste(frame, (sep_w, sep_t), alpha=True) - bg.circle_corner(6) # 加名字 text = card.name[:6] + "..." if len(card.name) > 7 else card.name - text_w, text_h = bg.getsize(text) - bg.text( + font = load_font(fontsize=14) + text_w, text_h = font.getsize(text) + draw = ImageDraw.Draw(bg.markImg) + draw.text( (sep_w + (w - text_w) / 2, h + sep_t + (sep_b - text_h) / 2), text, + font=font, fill=["#808080", "#3b8bff", "#8000ff", "#c90", "#ee494c"][card.star - 1], ) return bg @@ -146,6 +157,7 @@ class AzurHandle(BaseHandle[AzurChar]): if self.UP_EVENT: data = {"char": json.loads(self.UP_EVENT.json())} self.dump_data(data, f"draw_card_up/{self.game_name}_up_char.json") + self.dump_data(data, f"draw_card_up/{self.game_name}_up_char.json") async def _update_info(self): info = {} @@ -220,17 +232,22 @@ class AzurHandle(BaseHandle[AzurChar]): @staticmethod def parse_star(star: str) -> int: - if star in {"舰娘头像外框普通.png", "舰娘头像外框白色.png"}: + if star in ["舰娘头像外框普通.png", "舰娘头像外框白色.png"]: return 1 - elif star in {"舰娘头像外框稀有.png", "舰娘头像外框蓝色.png"}: + elif star in ["舰娘头像外框稀有.png", "舰娘头像外框蓝色.png"]: return 2 - elif star in {"舰娘头像外框精锐.png", "舰娘头像外框紫色.png"}: + elif star in ["舰娘头像外框精锐.png", "舰娘头像外框紫色.png"]: return 3 - elif star in {"舰娘头像外框超稀有.png", "舰娘头像外框金色.png"}: + elif star in ["舰娘头像外框超稀有.png", "舰娘头像外框金色.png"]: return 4 - elif star in {"舰娘头像外框海上传奇.png", "舰娘头像外框彩色.png"}: + elif star in ["舰娘头像外框海上传奇.png", "舰娘头像外框彩色.png"]: return 5 - elif star in {"舰娘头像外框最高方案.png", "舰娘头像外框决战方案.png", "舰娘头像外框超稀有META.png", "舰娘头像外框精锐META.png"}: + elif star in [ + "舰娘头像外框最高方案.png", + "舰娘头像外框决战方案.png", + "舰娘头像外框超稀有META.png", + "舰娘头像外框精锐META.png", + ]: return 6 else: return 6 diff --git a/plugins/draw_card/handles/ba_handle.py b/plugins/draw_card/handles/ba_handle.py index 9f473f88..5fe00462 100644 --- a/plugins/draw_card/handles/ba_handle.py +++ b/plugins/draw_card/handles/ba_handle.py @@ -1,17 +1,17 @@ import random -import json -from utils.http_utils import AsyncHttpx -from lxml import etree from typing import List, Tuple -from PIL import ImageDraw from urllib.parse import unquote -from nonebot.log import logger -from .base_handle import BaseHandle, BaseData -from ..config import draw_config -from ..util import remove_prohibited_str, cn2py, load_font +from lxml import etree +from nonebot.log import logger +from PIL import ImageDraw +from utils.http_utils import AsyncHttpx from utils.image_utils import BuildImage +from ..config import draw_config +from ..util import cn2py, load_font, remove_prohibited_str +from .base_handle import BaseData, BaseHandle + class BaChar(BaseData): pass @@ -68,17 +68,19 @@ class BaHandle(BaseHandle[BaChar]): bar = BuildImage(bar_w, bar_h, color="#6495ED") bg.paste(img, (sep_w, sep_h), alpha=True) bg.paste(bar, (sep_w, img_h - bar_h + sep_h), alpha=True) - if (card.star == 1): + if card.star == 1: star_path = str(self.img_path / "star-1.png") star_w = 15 - elif (card.star == 2): + elif card.star == 2: star_path = str(self.img_path / "star-2.png") star_w = 30 else: star_path = str(self.img_path / "star-3.png") star_w = 45 star = BuildImage(star_w, star_h, background=star_path) - bg.paste(star, (img_w // 2 - 15 * (card.star - 1) // 2, img_h - star_h), alpha=True) + bg.paste( + star, (img_w // 2 - 15 * (card.star - 1) // 2, img_h - star_h), alpha=True + ) text = card.name[:5] + "..." if len(card.name) > 6 else card.name font = load_font(fontsize=14) text_w, text_h = font.getsize(text) @@ -100,78 +102,37 @@ class BaHandle(BaseHandle[BaChar]): ) for key, value in self.load_data().items() ] - + def title2star(self, title: int): - if title == 'Star-3.png': + if title == "Star-3.png": return 3 - elif title == 'Star-2.png': + elif title == "Star-2.png": return 2 else: return 1 - # Bwiki 待恢复 - # async def _update_info(self): - # info = {} - # url = "https://wiki.biligame.com/bluearchive/学生筛选" - # result = await self.get_url(url) - # if not result: - # logger.warning(f"更新 {self.game_name_cn} 出错") - # return - # else: - # dom = etree.HTML(result, etree.HTMLParser()) - # char_list = dom.xpath("//div[@class='filters']/table[2]/tbody/tr") - # for char in char_list: - # try: - # name = char.xpath("./td[2]/a/div/text()")[0] - # avatar = char.xpath("./td[1]/div/div/a/img/@data-src")[0] - # star_pic = char.xpath("./td[4]/img/@alt")[0] - # except IndexError: - # continue - # member_dict = { - # "头像": unquote(str(avatar)), - # "名称": remove_prohibited_str(name), - # "星级": self.title2star(star_pic), - # } - # info[member_dict["名称"]] = member_dict - # self.dump_data(info) - # logger.info(f"{self.game_name_cn} 更新成功") - # # 下载头像 - # for value in info.values(): - # await self.download_img(value["头像"], value["名称"]) - # # 下载星星 - # await self.download_img( - # "https://patchwiki.biligame.com/images/bluearchive/thumb/e/e0/82nj2x9sxko473g7782r14fztd4zyky.png/15px-Star-1.png", - # "star-1", - # ) - # await self.download_img( - # "https://patchwiki.biligame.com/images/bluearchive/thumb/0/0b/msaff2g0zk6nlyl1rrn7n1ri4yobcqc.png/30px-Star-2.png", - # "star-2", - # ) - # await self.download_img( - # "https://patchwiki.biligame.com/images/bluearchive/thumb/8/8a/577yv79x1rwxk8efdccpblo0lozl158.png/46px-Star-3.png", - # "star-3" - # ) - - # 数据来源:SCHALE DB async def _update_info(self): info = {} url = "https://lonqie.github.io/SchaleDB/data/cn/students.min.json?v=49" - result = await AsyncHttpx.get(url) - if result.status_code != 200: + result = (await AsyncHttpx.get(url)).json() + if not result: logger.warning(f"更新 {self.game_name_cn} 出错") return else: - char_list = json.loads(result.text) - for char in char_list: + for char in result: try: - name = char.get("Name") - avatar = "https://lonqie.github.io/SchaleDB/images/student/icon/"+char.get("CollectionTexture")+".png" - star = char.get("StarGrade") + name = char["Name"] + avatar = ( + "https://github.com/lonqie/SchaleDB/raw/main/images/student/icon/" + + char["CollectionTexture"] + + ".png" + ) + star = char["StarGrade"] except IndexError: continue member_dict = { - "头像": unquote(str(avatar)), - "名称": remove_prohibited_str(name), + "头像": avatar, + "名称": name, "星级": star, } info[member_dict["名称"]] = member_dict @@ -191,6 +152,5 @@ class BaHandle(BaseHandle[BaChar]): ) await self.download_img( "https://patchwiki.biligame.com/images/bluearchive/thumb/8/8a/577yv79x1rwxk8efdccpblo0lozl158.png/46px-Star-3.png", - "star-3" + "star-3", ) - \ No newline at end of file diff --git a/plugins/draw_card/handles/base_handle.py b/plugins/draw_card/handles/base_handle.py index ddfc59af..ac0ee79e 100644 --- a/plugins/draw_card/handles/base_handle.py +++ b/plugins/draw_card/handles/base_handle.py @@ -1,18 +1,18 @@ import math +import anyio import random import aiohttp import asyncio -import aiofiles from PIL import Image from datetime import datetime from pydantic import BaseModel, Extra from asyncio.exceptions import TimeoutError from typing import Dict, List, Optional, TypeVar, Generic, Tuple -from nonebot.adapters.onebot.v11 import Message -from configs.path_config import IMAGE_PATH -from utils.message_builder import image +from nonebot.adapters.onebot.v11 import Message, MessageSegment from nonebot.log import logger -import asyncio + +from configs.path_config import DATA_PATH +from utils.message_builder import image try: import ujson as json @@ -50,29 +50,25 @@ class UpEvent(BaseModel): start_time: Optional[datetime] # 开始时间 end_time: Optional[datetime] # 结束时间 up_char: List[UpChar] # up对象 - up_name: str = "" # up名称 TC = TypeVar("TC", bound="BaseData") class BaseHandle(Generic[TC]): - def __init__(self, game_name: str, game_name_cn: str, game_card_color: str = "#ffffff"): + def __init__(self, game_name: str, game_name_cn: str): self.game_name = game_name self.game_name_cn = game_name_cn self.max_star = 1 # 最大星级 - self.data_path = DRAW_PATH - self.img_path = IMAGE_PATH / f"draw_card/{self.game_name}" - self.up_path = DRAW_PATH / "draw_card_up" + self.game_card_color: str = "#ffffff" + self.data_path = DATA_PATH / "draw_card" + self.img_path = DRAW_PATH / f"{self.game_name}" + self.up_path = DATA_PATH / "draw_card" / "draw_card_up" self.img_path.mkdir(parents=True, exist_ok=True) self.up_path.mkdir(parents=True, exist_ok=True) self.data_files: List[str] = [f"{self.game_name}.json"] - self.game_card_color: str = game_card_color - async def draw(self, count: int, **kwargs) -> Message: - return await asyncio.get_event_loop().run_in_executor(None, self._draw, count) - - def _draw(self, count: int, **kwargs) -> Message: + def draw(self, count: int, **kwargs) -> Message: index2card = self.get_cards(count, **kwargs) cards = [card[0] for card in index2card] result = self.format_result(index2card) @@ -173,7 +169,6 @@ class BaseHandle(Generic[TC]): card_imgs: List[BuildImage] = [] for card, num in zip(card_list, num_list): card_img = self.generate_card_img(card) - # 数量 > 1 时加数字上标 if num > 1: label = circled_number(num) @@ -274,7 +269,7 @@ class BaseHandle(Generic[TC]): return True try: async with self.session.get(url, timeout=10) as response: - async with aiofiles.open(str(img_path), "wb") as f: + async with await anyio.open_file(img_path, "wb") as f: await f.write(await response.read()) return True except TimeoutError: diff --git a/plugins/draw_card/handles/genshin_handle.py b/plugins/draw_card/handles/genshin_handle.py index 94016527..71e79544 100644 --- a/plugins/draw_card/handles/genshin_handle.py +++ b/plugins/draw_card/handles/genshin_handle.py @@ -6,10 +6,10 @@ from urllib.parse import unquote from typing import List, Optional, Tuple from pydantic import ValidationError from datetime import datetime, timedelta -from nonebot.adapters.onebot.v11 import Message -from utils.message_builder import image +from nonebot.adapters.onebot.v11 import Message, MessageSegment from nonebot.log import logger -import asyncio + +from utils.message_builder import image try: import ujson as json @@ -37,21 +37,23 @@ class GenshinArms(GenshinData): class GenshinHandle(BaseHandle[GenshinData]): def __init__(self): - super().__init__("genshin", "原神", "#ebebeb") + super().__init__("genshin", "原神") self.data_files.append("genshin_arms.json") self.max_star = 5 + self.game_card_color = "#ebebeb" self.config = draw_config.genshin self.ALL_CHAR: List[GenshinData] = [] self.ALL_ARMS: List[GenshinData] = [] self.UP_CHAR: Optional[UpEvent] = None + self.UP_CHAR_LIST: Optional[UpEvent] = [] self.UP_ARMS: Optional[UpEvent] = None self.count_manager = GenshinCountManager((10, 90), ("4", "5"), 180) # 抽取卡池 def get_card( - self, pool_name: str, mode: int = 1, add: float = 0.0, is_up: bool = False + self, pool_name: str, mode: int = 1, add: float = 0.0, is_up: bool = False, card_index: int = 0 ): """ mode 1:普通抽 2:四星保底 3:五星保底 @@ -74,7 +76,7 @@ class GenshinHandle(BaseHandle[GenshinData]): star = 5 if pool_name == "char": - up_event = self.UP_CHAR + up_event = self.UP_CHAR_LIST[card_index] all_list = self.ALL_CHAR + [ x for x in self.ALL_ARMS if x.star == star and x.star < 5 ] @@ -105,14 +107,13 @@ class GenshinHandle(BaseHandle[GenshinData]): return acquire_char def get_cards( - self, count: int, user_id: int, pool_name: str + self, count: int, user_id: int, pool_name: str, card_index: int = 0 ) -> List[Tuple[GenshinData, int]]: card_list = [] # 获取角色列表 add = 0.0 count_manager = self.count_manager - count_manager.check_timeout(user_id) # 检查上次抽卡次数是否超时 count_manager.check_count(user_id, count) # 检查次数累计 - pool = self.UP_CHAR if pool_name == "char" else self.UP_ARMS + pool = self.UP_CHAR_LIST[card_index] if pool_name == "char" else self.UP_ARMS for i in range(count): count_manager.increase(user_id) star = count_manager.check(user_id) # 是否有四星或五星保底 @@ -123,13 +124,13 @@ class GenshinHandle(BaseHandle[GenshinData]): add += draw_config.genshin.I72_ADD if star: if star == 4: - card = self.get_card(pool_name, 2, add=add) + card = self.get_card(pool_name, 2, add=add, card_index=card_index) else: card = self.get_card( - pool_name, 3, add, count_manager.is_up(user_id) + pool_name, 3, add, count_manager.is_up(user_id), card_index=card_index ) else: - card = self.get_card(pool_name, 1, add, count_manager.is_up(user_id)) + card = self.get_card(pool_name, 1, add, count_manager.is_up(user_id), card_index=card_index) # print(f"{count_manager.get_user_count(user_id)}:", # count_manager.get_user_five_index(user_id), star, card.star, add) # 四星角色 @@ -147,7 +148,6 @@ class GenshinHandle(BaseHandle[GenshinData]): else: count_manager.set_is_up(user_id, False) card_list.append((card, count_manager.get_user_count(user_id))) - count_manager.update_time(user_id) return card_list def generate_card_img(self, card: GenshinData) -> BuildImage: @@ -188,11 +188,11 @@ class GenshinHandle(BaseHandle[GenshinData]): bg.paste(star, (sep_w + int((frame_w - star.width) / 2), sep_h - 6), alpha=True) return bg - def format_pool_info(self, pool_name: str) -> str: + def format_pool_info(self, pool_name: str, card_index: int = 0) -> str: info = "" up_event = None if pool_name == "char": - up_event = self.UP_CHAR + up_event = self.UP_CHAR_LIST[card_index] elif pool_name == "arms": up_event = self.UP_ARMS if up_event: @@ -205,15 +205,18 @@ class GenshinHandle(BaseHandle[GenshinData]): info = f"当前up池:{up_event.title}\n{info}" return info.strip() - async def draw(self, count: int, user_id: int, pool_name: str = "", **kwargs) -> Message: - return await asyncio.get_event_loop().run_in_executor(None, self._draw, count, user_id, pool_name) - - def _draw(self, count: int, user_id: int, pool_name: str = "", **kwargs) -> Message: - index2cards = self.get_cards(count, user_id, pool_name) + def draw(self, count: int, user_id: int, pool_name: str = "", **kwargs) -> Message: + card_index = 0 + if "1" in pool_name: + card_index = 1 + pool_name = pool_name.replace("1", "") + index2cards = self.get_cards(count, user_id, pool_name, card_index) cards = [card[0] for card in index2cards] up_event = None if pool_name == "char": - up_event = self.UP_CHAR + if card_index == 1 and len(self.UP_CHAR_LIST) == 1: + return Message("当前没有第二个角色UP池") + up_event = self.UP_CHAR_LIST[card_index] elif pool_name == "arms": up_event = self.UP_ARMS up_list = [x.name for x in up_event.up_char] if up_event else [] @@ -225,7 +228,7 @@ class GenshinHandle(BaseHandle[GenshinData]): ) result += f"\n距离保底发还剩 {self.count_manager.get_user_guarantee_count(user_id)} 抽" # result += "\n【五星:0.6%,四星:5.1%,第72抽开始五星概率每抽加0.585%】" - pool_info = self.format_pool_info(pool_name) + pool_info = self.format_pool_info(pool_name, card_index) img = self.generate_img(cards) bk = BuildImage(img.w, img.h + 50, font_size=20, color="#ebebeb") bk.paste(img) @@ -255,17 +258,20 @@ class GenshinHandle(BaseHandle[GenshinData]): def load_up_char(self): try: data = self.load_data(f"draw_card_up/{self.game_name}_up_char.json") - self.UP_CHAR = UpEvent.parse_obj(data.get("char", {})) + self.UP_CHAR_LIST.append(UpEvent.parse_obj(data.get("char", {}))) + self.UP_CHAR_LIST.append(UpEvent.parse_obj(data.get("char1", {}))) self.UP_ARMS = UpEvent.parse_obj(data.get("arms", {})) except ValidationError: logger.warning(f"{self.game_name}_up_char 解析出错") def dump_up_char(self): - if self.UP_CHAR and self.UP_ARMS: + if self.UP_CHAR_LIST and self.UP_ARMS: data = { - "char": json.loads(self.UP_CHAR.json()), + "char": json.loads(self.UP_CHAR_LIST[0].json()), "arms": json.loads(self.UP_ARMS.json()), } + if len(self.UP_CHAR_LIST) > 1: + data['char1'] = json.loads(self.UP_CHAR_LIST[1].json()) self.dump_data(data, f"draw_card_up/{self.game_name}_up_char.json") async def _update_info(self): @@ -359,6 +365,7 @@ class GenshinHandle(BaseHandle[GenshinData]): await self.update_up_char() async def update_up_char(self): + self.UP_CHAR_LIST = [] url = "https://wiki.biligame.com/ys/祈愿" result = await self.get_url(url) if not result: @@ -399,14 +406,15 @@ class GenshinHandle(BaseHandle[GenshinData]): for name in star4_list ], ) - if index == 0: - self.UP_CHAR = up_event - elif index == 1: + if '神铸赋形' not in title: + self.UP_CHAR_LIST.append(up_event) + else: self.UP_ARMS = up_event - if self.UP_CHAR and self.UP_ARMS: + if self.UP_CHAR_LIST and self.UP_ARMS: self.dump_up_char() + char_title = " & ".join([x.title for x in self.UP_CHAR_LIST]) logger.info( - f"成功获取{self.game_name_cn}当前up信息...当前up池: {self.UP_CHAR.title} & {self.UP_ARMS.title}" + f"成功获取{self.game_name_cn}当前up信息...当前up池: {char_title} & {self.UP_ARMS.title}" ) except Exception as e: logger.warning(f"{self.game_name_cn}UP更新出错 {type(e)}:{e}") @@ -418,12 +426,23 @@ class GenshinHandle(BaseHandle[GenshinData]): async def _reload_pool(self) -> Optional[Message]: await self.update_up_char() self.load_up_char() - if self.UP_CHAR and self.UP_ARMS: + if self.UP_CHAR_LIST and self.UP_ARMS: + if len(self.UP_CHAR_LIST) > 1: + return Message( + Message.template("重载成功!\n当前UP池子:{} & {} & {}{:image}{:image}{:image}").format( + self.UP_CHAR_LIST[0].title, + self.UP_CHAR_LIST[1].title, + self.UP_ARMS.title, + self.UP_CHAR_LIST[0].pool_img, + self.UP_CHAR_LIST[1].pool_img, + self.UP_ARMS.pool_img, + ) + ) return Message( Message.template("重载成功!\n当前UP池子:{} & {}{:image}{:image}").format( - self.UP_CHAR.title, + char_title, self.UP_ARMS.title, - self.UP_CHAR.pool_img, + self.UP_CHAR_LIST[0].pool_img, self.UP_ARMS.pool_img, ) ) diff --git a/plugins/draw_card/handles/guardian_handle.py b/plugins/draw_card/handles/guardian_handle.py index 9c6766e0..33e9449b 100644 --- a/plugins/draw_card/handles/guardian_handle.py +++ b/plugins/draw_card/handles/guardian_handle.py @@ -7,10 +7,10 @@ from datetime import datetime from urllib.parse import unquote from typing import List, Optional, Tuple from pydantic import ValidationError -from nonebot.adapters.onebot.v11 import Message -from utils.message_builder import image +from nonebot.adapters.onebot.v11 import Message, MessageSegment from nonebot.log import logger -import asyncio + +from utils.message_builder import image try: import ujson as json @@ -137,10 +137,7 @@ class GuardianHandle(BaseHandle[GuardianData]): info = f"当前up池:{up_event.title}\n{info}" return info.strip() - async def draw(self, count: int, pool_name: str, **kwargs) -> Message: - return await asyncio.get_event_loop().run_in_executor(None, self._draw, count, pool_name) - - def _draw(self, count: int, pool_name: str, **kwargs) -> Message: + def draw(self, count: int, pool_name: str, **kwargs) -> Message: index2card = self.get_cards(count, pool_name) cards = [card[0] for card in index2card] up_event = self.UP_CHAR if pool_name == "char" else self.UP_ARMS @@ -238,9 +235,12 @@ class GuardianHandle(BaseHandle[GuardianData]): char_list = dom.xpath("//table[@id='CardSelectTr']/tbody/tr") for char in char_list: try: - name = char.xpath("./td[1]/a/@title")[0] - avatar = char.xpath("./td[1]/a/img/@src")[0] - star = char.xpath("./td[1]/span/img/@alt")[0] + # name = char.xpath("./td[1]/a/@title")[0] + # avatar = char.xpath("./td[1]/a/img/@src")[0] + # star = char.xpath("./td[1]/span/img/@alt")[0] + name = char.xpath("./th[1]/a[1]/@title")[0] + avatar = char.xpath("./th[1]/a/img/@src")[0] + star = char.xpath("./th[1]/span/img/@alt")[0] except IndexError: continue member_dict = { diff --git a/plugins/draw_card/handles/pretty_handle.py b/plugins/draw_card/handles/pretty_handle.py index 29163f01..5558e268 100644 --- a/plugins/draw_card/handles/pretty_handle.py +++ b/plugins/draw_card/handles/pretty_handle.py @@ -8,10 +8,10 @@ from datetime import datetime from urllib.parse import unquote from typing import List, Optional, Tuple from pydantic import ValidationError -from nonebot.adapters.onebot.v11 import Message -from utils.message_builder import image +from nonebot.adapters.onebot.v11 import Message, MessageSegment from nonebot.log import logger -import asyncio + +from utils.message_builder import image try: import ujson as json @@ -40,9 +40,10 @@ class PrettyCard(PrettyData): class PrettyHandle(BaseHandle[PrettyData]): def __init__(self): - super().__init__("pretty", "赛马娘", "#eff2f5") + super().__init__("pretty", "赛马娘") self.data_files.append("pretty_card.json") self.max_star = 3 + self.game_card_color = "#eff2f5" self.config = draw_config.pretty self.ALL_CHAR: List[PrettyChar] = [] @@ -129,10 +130,7 @@ class PrettyHandle(BaseHandle[PrettyData]): info = f"当前up池:{up_event.title}\n{info}" return info.strip() - async def draw(self, count: int, pool_name: str, **kwargs) -> Message: - return await asyncio.get_event_loop().run_in_executor(None, self._draw, count, pool_name) - - def _draw(self, count: int, pool_name: str, **kwargs) -> Message: + def draw(self, count: int, pool_name: str, **kwargs) -> Message: pool_name = "char" if not pool_name else pool_name index2card = self.get_cards(count, pool_name) cards = [card[0] for card in index2card] @@ -331,7 +329,9 @@ class PrettyHandle(BaseHandle[PrettyData]): try: title = announcement.xpath("./@title")[0] url = "https://wiki.biligame.com/" + announcement.xpath("./@href")[0] - if "卡池" in title: + if re.match(r".*?\d{8}$", title) or re.match( + r"^\d{1,2}月\d{1,2}日.*?", title + ): break except IndexError: continue @@ -348,49 +348,48 @@ class PrettyHandle(BaseHandle[PrettyData]): char_img = "" card_img = "" up_chars = [] - up_chars_name = [] up_cards = [] - up_cards_name = [] - dom = etree.HTML(result, etree.HTMLParser()) - content = dom.xpath('//*[@id="mw-content-text"]/div/div/div[1]/div')[0] - heads = content.xpath("./div") + soup = BeautifulSoup(result, "lxml") + heads = soup.find_all("span", {"class": "mw-headline"}) for head in heads: - if "时间" in head.find("./img").tail or "期间" in head.find("./img").tail: - time = content.xpath("./p")[1].text.split("\n")[0] + if "时间" in head.text: + time = head.find_next("p").text.split("\n")[0] if "~" in time: start, end = time.split("~") start_time = dateparser.parse(start) end_time = dateparser.parse(end) - elif "赛马娘" in head.find("./img").tail: - char_img = content.xpath("./center")[1].find("./a/img").get("src") - lines = content.xpath("./p")[2].xpath("string(.)").split("\n") + elif "赛马娘" in head.text: + char_img = head.find_next("a", {"class": "image"}).find("img")[ + "src" + ] + lines = str(head.find_next("p").text).split("\n") chars = [ line for line in lines - if "★" in line and "[" in line and "]" in line + if "★" in line and "(" in line and ")" in line ] - for char in set(chars): # list去重 + for char in chars: star = char.count("★") - name = re.split("[ ]",char)[-1] + name = re.split(r"[()]", char)[-2].strip() up_chars.append( UpChar(name=name, star=star, limited=False, zoom=70) ) - up_chars_name.append(name) - elif "支援卡" in head.find("./img").tail: - card_img = content.xpath("./center")[2].find("./a/img").get("src") - lines = content.xpath("./p")[3].xpath("string(.)").split("\n") + elif "支援卡" in head.text: + card_img = head.find_next("a", {"class": "image"}).find("img")[ + "src" + ] + lines = str(head.find_next("p").text).split("\n") cards = [ line for line in lines - if "R" in line and "[" in line and "]" in line + if "R" in line and "(" in line and ")" in line ] - for card in set(cards): + for card in cards: star = 3 if "SSR" in card else 2 if "SR" in card else 1 - name = re.split("[ ]",card)[-1] + name = re.split(r"[()]", card)[-2].strip() up_cards.append( UpChar(name=name, star=star, limited=False, zoom=70) ) - up_cards_name.append(name) if start_time and end_time: if start_time <= datetime.now() <= end_time: self.UP_CHAR = UpEvent( @@ -399,7 +398,6 @@ class PrettyHandle(BaseHandle[PrettyData]): start_time=start_time, end_time=end_time, up_char=up_chars, - up_name=",".join(up_chars_name), ) self.UP_CARD = UpEvent( title=title, @@ -407,7 +405,6 @@ class PrettyHandle(BaseHandle[PrettyData]): start_time=start_time, end_time=end_time, up_char=up_cards, - up_name=",".join(up_cards_name), ) self.dump_up_char() logger.info(f"成功获取{self.game_name_cn}当前up信息...当前up池: {title}") @@ -419,8 +416,9 @@ class PrettyHandle(BaseHandle[PrettyData]): self.load_up_char() if self.UP_CHAR and self.UP_CARD: return Message( - Message.template("重载成功!\n当前UP池子:{}\n当前支援卡池子:{}").format( - self.UP_CHAR.up_name, - self.UP_CARD.up_name, + Message.template("重载成功!\n当前UP池子:{}{:image}{:image}").format( + self.UP_CHAR.title, + self.UP_CHAR.pool_img, + self.UP_CARD.pool_img, ) ) diff --git a/plugins/draw_card/handles/prts_handle.py b/plugins/draw_card/handles/prts_handle.py index 4d0e20b8..aac9db13 100644 --- a/plugins/draw_card/handles/prts_handle.py +++ b/plugins/draw_card/handles/prts_handle.py @@ -1,16 +1,17 @@ -import re import random -import dateparser -from lxml import etree -from PIL import ImageDraw +import re from datetime import datetime -from urllib.parse import unquote from typing import List, Optional, Tuple -from pydantic import ValidationError -from nonebot.adapters.onebot.v11 import Message -from utils.message_builder import image +from urllib.parse import unquote + +import dateparser +from PIL import ImageDraw +from lxml import etree +from nonebot.adapters.onebot.v11 import Message, MessageSegment from nonebot.log import logger -import asyncio +from pydantic import ValidationError + +from utils.message_builder import image try: import ujson as json @@ -104,17 +105,18 @@ class PrtsHandle(BaseHandle[Operator]): info = f"当前up池: {self.UP_EVENT.title}\n{info}" return info.strip() - async def draw(self, count: int, **kwargs) -> Message: - return await asyncio.get_event_loop().run_in_executor(None, self._draw, count) - - def _draw(self, count: int, **kwargs) -> Message: + def draw(self, count: int, **kwargs) -> Message: index2card = self.get_cards(count) """这里cards修复了抽卡图文不符的bug""" cards = [card[0] for card in index2card] up_list = [x.name for x in self.UP_EVENT.up_char] if self.UP_EVENT else [] result = self.format_result(index2card, up_list=up_list) pool_info = self.format_pool_info() - return pool_info + image(b64=self.generate_img(cards).pic2bs4()) + result + return ( + pool_info + + image(b64=self.generate_img(cards).pic2bs4()) + + result + ) def generate_card_img(self, card: Operator) -> BuildImage: sep_w = 5 @@ -177,6 +179,7 @@ class PrtsHandle(BaseHandle[Operator]): self.dump_data(data, f"draw_card_up/{self.game_name}_up_char.json") async def _update_info(self): + """更新信息""" info = {} url = "https://wiki.biligame.com/arknights/干员数据表" result = await self.get_url(url) @@ -288,10 +291,7 @@ class PrtsHandle(BaseHandle[Operator]): up_chars.append( UpChar(name=name, star=star, limited=False, zoom=zoom) ) - if start_time <= datetime.now() <= end_time: # 跳过过时的卡池 - break # 这里break会导致个问题:如果一个公告里有两个池子,会漏掉下面的池子,比如 5.19 的定向寻访。但目前我也没啥好想法解决 - chars: List[str] = [] # 查找下一个卡池之前清空 chars - up_chars = [] # 查找下一个卡池之前清空 up_chars + break # 这里break会导致个问题:如果一个公告里有两个池子,会漏掉下面的池子,比如 5.19 的定向寻访。但目前我也没啥好想法解决 if title and start_time and end_time: if start_time <= datetime.now() <= end_time: self.UP_EVENT = UpEvent( @@ -309,6 +309,6 @@ class PrtsHandle(BaseHandle[Operator]): await self.update_up_char() self.load_up_char() if self.UP_EVENT: - return f"重载成功!\n当前UP池子:{self.UP_EVENT.title}" + image( + return f"重载成功!\n当前UP池子:{self.UP_EVENT.title}" + MessageSegment.image( self.UP_EVENT.pool_img ) diff --git a/plugins/draw_card/rule.py b/plugins/draw_card/rule.py deleted file mode 100644 index e69de29b..00000000 diff --git a/plugins/draw_card/util.py b/plugins/draw_card/util.py index a234cb68..860e35eb 100644 --- a/plugins/draw_card/util.py +++ b/plugins/draw_card/util.py @@ -1,14 +1,19 @@ import platform import pypinyin +from pathlib import Path from PIL.ImageFont import FreeTypeFont from PIL import Image, ImageDraw, ImageFont from PIL.Image import Image as IMG + from configs.path_config import FONT_PATH +dir_path = Path(__file__).parent.absolute() + def cn2py(word) -> str: + """保存声调,防止出现类似方舟干员红与吽拼音相同声调不同导致红照片无法保存的问题""" temp = "" - for i in pypinyin.pinyin(word, style=pypinyin.NORMAL): + for i in pypinyin.pinyin(word, style=pypinyin.Style.TONE3): temp += "".join(i) return temp @@ -26,9 +31,9 @@ def remove_prohibited_str(name: str) -> str: return name -def load_font(font_name: str = "msyh.ttf", fontsize: int = 16) -> FreeTypeFont: +def load_font(fontname: str = "msyh.ttf", fontsize: int = 16) -> FreeTypeFont: return ImageFont.truetype( - str(FONT_PATH / f"{font_name}"), fontsize, encoding="utf-8" + str(FONT_PATH / f"{fontname}"), fontsize, encoding="utf-8" ) diff --git a/plugins/translate/data_source.py b/plugins/translate/data_source.py index 072a80cc..a5354b02 100755 --- a/plugins/translate/data_source.py +++ b/plugins/translate/data_source.py @@ -1,22 +1,56 @@ from utils.http_utils import AsyncHttpx +import time +import random +from hashlib import md5 -url = f"http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule&smartresult=ugc&sessionFrom=null" +# url = f"http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule&smartresult=ugc&sessionFrom=null" +url = f"https://fanyi-api.baidu.com/api/trans/vip/translate" -async def translate_msg(language_type, msg): +# 获取lts时间戳,salt加密盐,sign加密签名 +def get_lts_salt_sign(word): + lts = str(int(time.time() * 10000)) + salt = lts + str(random.randint(0, 9)) + string = "fanyideskweb" + word + salt + "Ygy_4c=r#e#4EX^NUGUc5" + s = md5() + # md5的加密串必须为字节码 + s.update(string.encode()) + # 16进制加密 + sign = s.hexdigest() + print(lts, salt, sign) + return lts, salt, sign + +async def translate_msg(language_type, word): + # data = { + # "type": parse_language(language_type), + # "i": msg, + # "doctype": "json", + # "version": "2.1", + # "keyfrom": "fanyi.web", + # "ue": "UTF-8", + # "action": "FY_BY_CLICKBUTTON", + # "typoResult": "true", + # } + lts, salt, sign = get_lts_salt_sign(word) data = { "type": parse_language(language_type), - "i": msg, - "doctype": "json", - "version": "2.1", - "keyfrom": "fanyi.web", - "ue": "UTF-8", - "action": "FY_BY_CLICKBUTTON", - "typoResult": "true", + 'i': word, + 'from': 'AUTO', + 'to': 'AUTO', + 'smartresult': 'dict', + 'client': 'fanyideskweb', + 'salt': salt, + 'sign': sign, + 'lts': lts, + 'bv': 'bdc0570a34c12469d01bfac66273680d', + 'doctype': 'json', + 'version': '2.1', + 'keyfrom': 'fanyi.web', + 'action': 'FY_BY_REALTlME' } data = (await AsyncHttpx.post(url, data=data)).json() if data["errorCode"] == 0: - return f'原文:{msg}\n翻译:{data["translateResult"][0][0]["tgt"]}' + return f'原文:{word}\n翻译:{data["translateResult"][0][0]["tgt"]}' return "翻译惜败.." diff --git a/pyproject.toml b/pyproject.toml index b251c025..d7a4b9da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,8 @@ rich = "^12.4.3" gino = "^1.0.1" nonebot-adapter-onebot = "^2.1.5" nonebot-plugin-apscheduler = "^0.2.0" +nonebot-plugin-htmlrender = "^0.2.0" +cachetools = "^5.2.0" [tool.poetry.dev-dependencies] diff --git a/utils/image_template.py b/utils/image_template.py new file mode 100644 index 00000000..caa93698 --- /dev/null +++ b/utils/image_template.py @@ -0,0 +1,35 @@ +from typing import Tuple, List, Literal + +from utils.image_utils import BuildImage, text2image + + +async def help_template(title: str, usage: BuildImage) -> BuildImage: + """ + 说明: + 生成单个功能帮助模板 + 参数: + :param title: 标题 + :param usage: 说明图片 + """ + title_image = BuildImage( + 0, + 0, + font_size=35, + plain_text=title, + font_color=(255, 255, 255), + font="CJGaoDeGuo.otf", + ) + background_image = BuildImage( + max(title_image.w, usage.w) + 50, + max(title_image.h, usage.h) + 100, + color=(114, 138, 204), + ) + await background_image.apaste(usage, (25, 80), True) + await background_image.apaste(title_image, (25, 20), True) + await background_image.aline( + (25, title_image.h + 22, 25 + title_image.w, title_image.h + 22), + (204, 196, 151), + 3, + ) + return background_image + diff --git a/utils/image_utils.py b/utils/image_utils.py index 67506afd..d819cc17 100755 --- a/utils/image_utils.py +++ b/utils/image_utils.py @@ -1,20 +1,21 @@ import asyncio import base64 +import os import random import re -import time +import uuid from io import BytesIO from math import ceil from pathlib import Path -from typing import List, Literal, Optional, Tuple, Union -import uuid +from typing import List, Literal, Optional, Tuple, Union, Callable, Awaitable +from nonebot.utils import is_coroutine_callable + import cv2 import imagehash from configs.path_config import FONT_PATH, IMAGE_PATH from imagehash import ImageHash from matplotlib import pyplot as plt from PIL import Image, ImageDraw, ImageFile, ImageFilter, ImageFont - from services import logger ImageFile.LOAD_TRUNCATED_IMAGES = True @@ -159,6 +160,7 @@ class BuildImage: is_alpha: bool = False, plain_text: Optional[str] = None, font_color: Optional[Union[str, Tuple[int, int, int]]] = None, + **kwargs ): """ 参数: @@ -693,7 +695,11 @@ class BuildImage: except ValueError: pass - async def acircle_corner(self, radii: int = 30, point_list: List[Literal["lt", "rt", "lb", "lr"]] = None): + async def acircle_corner( + self, + radii: int = 30, + point_list: List[Literal["lt", "rt", "lb", "rb"]] = ["lt", "rt", "lb", "rb"], + ): """ 说明: 异步 矩形四角变圆 @@ -703,7 +709,11 @@ class BuildImage: """ await self.loop.run_in_executor(None, self.circle_corner, radii, point_list) - def circle_corner(self, radii: int = 30, point_list: List[Literal["lt", "rt", "lb", "lr"]] = None): + def circle_corner( + self, + radii: int = 30, + point_list: List[Literal["lt", "rt", "lb", "rb"]] = ["lt", "rt", "lb", "rb"], + ): """ 说明: 矩形四角变圆 @@ -711,15 +721,13 @@ class BuildImage: :param radii: 半径 :param point_list: 需要变化的角 """ - if not point_list: - point_list = ["lt", "rt", "lb", "rb"] # 画圆(用于分离4个角) + img = self.markImg.convert("RGBA") + alpha = img.split()[-1] circle = Image.new("L", (radii * 2, radii * 2), 0) draw = ImageDraw.Draw(circle) - draw.ellipse((0, 0, radii * 2, radii * 2), fill=255) - self.markImg = self.markImg.convert("RGBA") - w, h = self.markImg.size - alpha = Image.new("L", self.markImg.size, 255) + draw.ellipse((0, 0, radii * 2, radii * 2), fill=255) # 黑色方形内切白色圆形 + w, h = img.size if "lt" in point_list: alpha.paste(circle.crop((0, 0, radii, radii)), (0, 0)) if "rt" in point_list: @@ -728,9 +736,11 @@ class BuildImage: alpha.paste(circle.crop((0, radii, radii, radii * 2)), (0, h - radii)) if "rb" in point_list: alpha.paste( - circle.crop((radii, radii, radii * 2, radii * 2)), (w - radii, h - radii) + circle.crop((radii, radii, radii * 2, radii * 2)), + (w - radii, h - radii), ) - self.markImg.putalpha(alpha) + img.putalpha(alpha) + self.markImg = img self.draw = ImageDraw.Draw(self.markImg) async def arotate(self, angle: int, expand: bool = False): @@ -1518,7 +1528,16 @@ async def text2image( w, _ = _tmp.getsize(x.strip() or "正") height += h + line_height width = width if width > w else w - image_list.append(BuildImage(w, h, font=font, font_size=font_size, plain_text=x.strip(), color=color)) + image_list.append( + BuildImage( + w, + h, + font=font, + font_size=font_size, + plain_text=x.strip(), + color=color, + ) + ) width += pw height += ph A = BuildImage( @@ -1533,5 +1552,143 @@ async def text2image( return A +def group_image(image_list: List[BuildImage]) -> Tuple[List[List[BuildImage]], int]: + """ + 说明: + 根据图片大小进行分组 + 参数: + :param image_list: 排序图片列表 + """ + image_list.sort(key=lambda x: x.h, reverse=True) + max_image = max(image_list, key=lambda x: x.h) + + image_list.remove(max_image) + max_h = max_image.h + total_w = 0 + + # 图片分组 + image_group = [[max_image]] + is_use = [] + surplus_list = image_list[:] + + for image in image_list: + if image.uid not in is_use: + group = [image] + is_use.append(image.uid) + curr_h = image.h + while True: + surplus_list = [x for x in surplus_list if x.uid not in is_use] + for tmp in surplus_list: + temp_h = curr_h + tmp.h + 10 + if temp_h < max_h or abs(max_h - temp_h) < 100: + curr_h += tmp.h + 15 + is_use.append(tmp.uid) + group.append(tmp) + break + else: + break + total_w += max([x.w for x in group]) + 15 + image_group.append(group) + while surplus_list: + surplus_list = [x for x in surplus_list if x.uid not in is_use] + if not surplus_list: + break + surplus_list.sort(key=lambda x: x.h, reverse=True) + for img in surplus_list: + if img.uid not in is_use: + _w = 0 + index = -1 + for i, ig in enumerate(image_group): + if s := sum([x.h for x in ig]) > _w: + _w = s + index = i + if index != -1: + image_group[index].append(img) + is_use.append(img.uid) + + max_h = 0 + max_w = 0 + for ig in image_group: + if (_h := sum([x.h + 15 for x in ig])) > max_h: + max_h = _h + max_w += max([x.w for x in ig]) + 30 + is_use.clear() + while abs(max_h - max_w) > 200 and len(image_group) - 1 >= len(image_group[-1]): + for img in image_group[-1]: + _min_h = 999999 + _min_index = -1 + for i, ig in enumerate(image_group): + # if i not in is_use and (_h := sum([x.h for x in ig]) + img.h) > _min_h: + if (_h := sum([x.h for x in ig]) + img.h) < _min_h: + _min_h = _h + _min_index = i + is_use.append(_min_index) + image_group[_min_index].append(img) + max_w -= max([x.w for x in image_group[-1]]) - 30 + image_group.pop(-1) + max_h = max([sum([x.h + 15 for x in ig]) for ig in image_group]) + return image_group, max(max_h + 250, max_w + 70) + + +async def build_sort_image( + image_group: List[List[BuildImage]], + h: Optional[int] = None, + padding_top: int = 200, + color: Union[str, Tuple[int, int, int], Tuple[int, int, int, int]] = (255, 255, 255), + background_path: Optional[Path] = None, + background_handle: Callable[[BuildImage], Optional[Awaitable]] = None, +) -> BuildImage: + """ + 说明: + 对group_image的图片进行组装 + 参数: + :param image_group: 分组图片列表 + :param h: max(宽,高),一般为group_image的返回值,有值时,图片必定为正方形 + :param padding_top: 图像列表与最顶层间距 + :param color: 背景颜色 + :param background_path: 背景图片文件夹路径(随机) + :param background_handle: 背景图额外操作 + """ + bk_file = None + if background_path: + random_bk = os.listdir(background_path) + if random_bk: + bk_file = random.choice(random_bk) + image_w = 0 + image_h = 0 + if not h: + for ig in image_group: + _w = max([x.w + 30 for x in ig]) + image_w += _w + 30 + _h = sum([x.h + 10 for x in ig]) + if _h > image_h: + image_h = _h + image_h += padding_top + else: + image_w = h + image_h = h + A = BuildImage( + image_w, + image_h, + font_size=24, + font="CJGaoDeGuo.otf", + color=color, + background=(background_path / bk_file) if bk_file else None, + ) + if background_handle: + if is_coroutine_callable(background_handle): + await background_handle(A) + else: + background_handle(A) + curr_w = 50 + for ig in image_group: + curr_h = padding_top - 20 + for img in ig: + await A.apaste(img, (curr_w, curr_h), True) + curr_h += img.h + 10 + curr_w += max([x.w for x in ig]) + 30 + return A + + if __name__ == "__main__": pass diff --git a/utils/manager/data_class.py b/utils/manager/data_class.py index 5744b3a7..d18c6ded 100755 --- a/utils/manager/data_class.py +++ b/utils/manager/data_class.py @@ -1,16 +1,17 @@ -from typing import Union, Optional, TypeVar, Generic, Dict, NoReturn, Any -from pathlib import Path -from ruamel.yaml import YAML -from ruamel import yaml -import ujson as json import copy +from pathlib import Path +from typing import Any, Dict, Generic, NoReturn, Optional, TypeVar, Union + +import ujson as json +from ruamel import yaml +from ruamel.yaml import YAML from .models import * _yaml = YAML(typ="safe") -T = TypeVar("T", AdminSetting, BaseData, PluginBlock, PluginCd, PluginCount, PluginSetting, Plugin) +T = TypeVar("T") class StaticData(Generic[T]): diff --git a/utils/manager/group_manager.py b/utils/manager/group_manager.py index 8eebbfd1..b522dd8b 100644 --- a/utils/manager/group_manager.py +++ b/utils/manager/group_manager.py @@ -1,5 +1,5 @@ import copy -from typing import List, Union, Dict, Callable +from typing import List, Union, Dict, Callable, Any from pathlib import Path from .models import BaseData, BaseGroup from utils.manager.data_class import StaticData @@ -57,7 +57,7 @@ def init_task(func: Callable): return wrapper -class GroupManager(StaticData): +class GroupManager(StaticData[BaseData]): """ 群权限 | 功能 | 总开关 | 聊天时间 管理器 """ @@ -354,6 +354,9 @@ class GroupManager(StaticData): with open(path, "w", encoding="utf8") as f: json.dump(dict_data, f, ensure_ascii=False, indent=4) + def get(self, key: str, default: Any = None) -> BaseGroup: + return self._data.group_manager.get(key, default) + def __setitem__(self, key, value): self._data.group_manager[key] = value diff --git a/utils/manager/models.py b/utils/manager/models.py index 89d2e037..ae7da83f 100644 --- a/utils/manager/models.py +++ b/utils/manager/models.py @@ -113,6 +113,7 @@ class PluginData(BaseModel): name: str plugin_type: PluginType # 插件内部类型,根据name [Hidden] [Admin] [SUPERUSER] usage: Optional[str] + superuser_usage: Optional[str] des: Optional[str] task: Optional[Dict[str, str]] menu_type: Tuple[Union[str, int], ...] = ("normal",) # 菜单类型 diff --git a/utils/manager/plugin_data_manager.py b/utils/manager/plugin_data_manager.py index 2e7d292c..0e5063d8 100644 --- a/utils/manager/plugin_data_manager.py +++ b/utils/manager/plugin_data_manager.py @@ -4,7 +4,7 @@ from . import StaticData from .models import PluginData -class PluginDataManager(StaticData): +class PluginDataManager(StaticData[PluginData]): """ 插件所有信息管理 """ diff --git a/utils/manager/plugins2block_manager.py b/utils/manager/plugins2block_manager.py index 7312580f..92259ffc 100644 --- a/utils/manager/plugins2block_manager.py +++ b/utils/manager/plugins2block_manager.py @@ -9,7 +9,7 @@ from .models import PluginBlock _yaml = yaml.YAML(typ="safe") -class Plugins2blockManager(StaticData): +class Plugins2blockManager(StaticData[PluginBlock]): """ 插件命令阻塞 管理器 """ diff --git a/utils/manager/plugins2cd_manager.py b/utils/manager/plugins2cd_manager.py index b819939b..40aa668a 100644 --- a/utils/manager/plugins2cd_manager.py +++ b/utils/manager/plugins2cd_manager.py @@ -9,7 +9,7 @@ from .models import PluginCd _yaml = yaml.YAML(typ="safe") -class Plugins2cdManager(StaticData): +class Plugins2cdManager(StaticData[PluginCd]): """ 插件命令 cd 管理器 """ diff --git a/utils/manager/plugins2count_manager.py b/utils/manager/plugins2count_manager.py index 9b593f1e..09b305df 100644 --- a/utils/manager/plugins2count_manager.py +++ b/utils/manager/plugins2count_manager.py @@ -9,7 +9,7 @@ from .models import PluginCount _yaml = yaml.YAML(typ="safe") -class Plugins2countManager(StaticData): +class Plugins2countManager(StaticData[PluginCount]): """ 插件命令 次数 管理器 """ diff --git a/utils/manager/plugins2settings_manager.py b/utils/manager/plugins2settings_manager.py index 28443eba..0148069b 100644 --- a/utils/manager/plugins2settings_manager.py +++ b/utils/manager/plugins2settings_manager.py @@ -7,7 +7,7 @@ from .models import PluginSetting, PluginType _yaml = yaml.YAML(typ="safe") -class Plugins2settingsManager(StaticData): +class Plugins2settingsManager(StaticData[PluginSetting]): """ 插件命令阻塞 管理器 """ diff --git a/utils/manager/plugins_manager.py b/utils/manager/plugins_manager.py index 5ae7abc4..909374cf 100644 --- a/utils/manager/plugins_manager.py +++ b/utils/manager/plugins_manager.py @@ -30,7 +30,7 @@ def init_plugin(func: Callable): return wrapper -class PluginsManager(StaticData): +class PluginsManager(StaticData[Plugin]): """ 插件 管理器 """