diff --git a/.env.dev b/.env.dev index e7257740..115bcb61 100644 --- a/.env.dev +++ b/.env.dev @@ -21,32 +21,32 @@ PLATFORM_SUPERUSERS = ' DRIVER=~fastapi+~httpx+~websockets # kook adapter toekn -kaiheila_bots =[{"token": ""}] +# kaiheila_bots =[{"token": ""}] -# discode adapter -DISCORD_BOTS=' -[ - { - "token": "", - "intent": { - "guild_messages": true, - "direct_messages": true - }, - "application_commands": {"*": ["*"]} - } -] -' -DISCORD_PROXY='' +# # discode adapter +# DISCORD_BOTS=' +# [ +# { +# "token": "", +# "intent": { +# "guild_messages": true, +# "direct_messages": true +# }, +# "application_commands": {"*": ["*"]} +# } +# ] +# ' +# DISCORD_PROXY='' -# dodo adapter -DODO_BOTS=' -[ - { - "client_id": "", - "token": "" - } -] -' +# # dodo adapter +# DODO_BOTS=' +# [ +# { +# "client_id": "", +# "token": "" +# } +# ] +# ' # application_commands的{"*": ["*"]}代表将全部应用命令注册为全局应用命令 # {"admin": ["123", "456"]}则代表将admin命令注册为id是123、456服务器的局部命令,其余命令不注册 @@ -55,3 +55,5 @@ LOG_LEVEL=DEBUG # 服务器和端口 HOST = 127.0.0.1 PORT = 8080 + + \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 6d295644..0d5c4663 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,6 +4,7 @@ "PYTHONPATH": "${workspaceFolder}${pathSeparator}${env:PYTHONPATH}" }, "cSpell.words": [ + "aiofiles", "Alconna", "arclet", "Arparma", diff --git a/zhenxun/builtin_plugins/admin/ban/__init__.py b/zhenxun/builtin_plugins/admin/ban/__init__.py index 330eaeb5..822bcf98 100644 --- a/zhenxun/builtin_plugins/admin/ban/__init__.py +++ b/zhenxun/builtin_plugins/admin/ban/__init__.py @@ -26,7 +26,7 @@ from ._data_source import BanManage base_config = Config.get("ban") __plugin_meta__ = PluginMetadata( - name="封禁用户/群组", + name="Ban", description="你被逮捕了!丢进小黑屋!封禁用户以及群组,屏蔽消息", usage=""" 普通管理员 @@ -37,8 +37,13 @@ __plugin_meta__ = PluginMetadata( ban @用户 : 永久拉黑用户 ban @用户 100 : 拉黑用户100分钟 unban @用户 : 从小黑屋中拉出来 - - 超级管理员额外命令 + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + plugin_type=PluginType.SUPER_AND_ADMIN, + superuser_help=""" + 超级管理员额外命令 格式: ban [At用户/用户Id] [时长] ban列表: 获取所有Ban数据 @@ -54,11 +59,7 @@ __plugin_meta__ = PluginMetadata( unban 123456789 : 从小黑屋中拉出来 unban -g 999999 : 将群组9999999从小黑屋中拉出来 - """.strip(), - extra=PluginExtraData( - author="HibiKier", - version="0.1", - plugin_type=PluginType.SUPER_AND_ADMIN, + """, admin_level=base_config.get("BAN_LEVEL", 5), configs=[ RegisterConfig( diff --git a/zhenxun/builtin_plugins/admin/plugin_switch/__init__.py b/zhenxun/builtin_plugins/admin/plugin_switch/__init__.py index 59295c44..bbaab13b 100644 --- a/zhenxun/builtin_plugins/admin/plugin_switch/__init__.py +++ b/zhenxun/builtin_plugins/admin/plugin_switch/__init__.py @@ -32,22 +32,24 @@ __plugin_meta__ = PluginMetadata( 关闭签到 : 关闭签到 开启群被动早晚安 : 关闭被动任务早晚安 - 超级管理员额外命令 - 格式: - 插件列表 - 开启/关闭[功能名称] ?[-t ["private", "p", "group", "g"](关闭类型)] ?[-g 群组Id] - - 私聊下: - 示例: - 开启签到 : 全局开启签到 - 关闭签到 : 全局关闭签到 - 关闭签到 p : 全局私聊关闭签到 - 关闭签到 -g 12345678 : 关闭群组12345678的签到功能(普通管理员无法开启) """.strip(), extra=PluginExtraData( author="HibiKier", version="0.1", plugin_type=PluginType.SUPER_AND_ADMIN, + superuser_help=""" + 超级管理员额外命令 + 格式: + 插件列表 + 开启/关闭[功能名称] ?[-t ["private", "p", "group", "g"](关闭类型)] ?[-g 群组Id] + + 私聊下: + 示例: + 开启签到 : 全局开启签到 + 关闭签到 : 全局关闭签到 + 关闭签到 p : 全局私聊关闭签到 + 关闭签到 -g 12345678 : 关闭群组12345678的签到功能(普通管理员无法开启) + """, admin_level=base_config.get("CHANGE_GROUP_SWITCH_LEVEL", 2), configs=[ RegisterConfig( diff --git a/zhenxun/builtin_plugins/help/__init__.py b/zhenxun/builtin_plugins/help/__init__.py index 8f6fd606..3aa6021d 100644 --- a/zhenxun/builtin_plugins/help/__init__.py +++ b/zhenxun/builtin_plugins/help/__init__.py @@ -1,8 +1,9 @@ from pathlib import Path +from nonebot.adapters import Bot from nonebot.plugin import PluginMetadata from nonebot.rule import to_me -from nonebot_plugin_alconna import Alconna, Args, Match, on_alconna +from nonebot_plugin_alconna import Alconna, AlconnaQuery, Args, Match, Option, Query, on_alconna, store_true from nonebot_plugin_saa import Image, Text from nonebot_plugin_session import EventSession @@ -43,6 +44,7 @@ _matcher = on_alconna( Alconna( "功能", Args["name?", str], + Option("-s|--superuser", action=store_true, help_text="超级用户帮助") ), aliases={"help", "帮助"}, rule=to_me(), @@ -53,11 +55,18 @@ _matcher = on_alconna( @_matcher.handle() async def _( + bot: Bot, name: Match[str], session: EventSession, + is_superuser: Query[bool] = AlconnaQuery("superuser.value", False) ): + _is_superuser = False + if is_superuser.available: + _is_superuser = is_superuser.result if name.available: - if result := await get_plugin_help(name.result): + if _is_superuser and session.id1 not in bot.config.superusers: + _is_superuser = False + if result := await get_plugin_help(name.result, _is_superuser): if isinstance(result, BuildImage): await Image(result.pic2bytes()).send(reply=True) else: diff --git a/zhenxun/builtin_plugins/help/_data_source.py b/zhenxun/builtin_plugins/help/_data_source.py index 0f6a010f..68da9f41 100644 --- a/zhenxun/builtin_plugins/help/_data_source.py +++ b/zhenxun/builtin_plugins/help/_data_source.py @@ -21,19 +21,30 @@ async def create_help_img(group_id: str | None): await HelpImageBuild().build_image(group_id) -async def get_plugin_help(name: str) -> str | BuildImage: +async def get_plugin_help(name: str, is_superuser: bool) -> str | BuildImage: """获取功能的帮助信息 参数: name: 插件名称 + is_superuser: 是否为超级用户 """ - if plugin := await PluginInfo.get_or_none(name=name): + if plugin := await PluginInfo.get_or_none(name__iexact=name): _plugin = nonebot.get_plugin_by_module_name(plugin.module_path) if _plugin and _plugin.metadata: - items = { - "简介": _plugin.metadata.description, - "用法": _plugin.metadata.usage, - } - return await ImageTemplate.hl_page(name, items) + items = None + if is_superuser: + extra = _plugin.metadata.extra + if usage := extra.get("superuser_help"): + items = { + "简介": _plugin.metadata.description, + "用法": usage, + } + else: + items = { + "简介": _plugin.metadata.description, + "用法": _plugin.metadata.usage, + } + if items: + return await ImageTemplate.hl_page(name, items) return "糟糕! 该功能没有帮助喔..." return "没有查找到这个功能噢..." diff --git a/zhenxun/configs/utils/__init__.py b/zhenxun/configs/utils/__init__.py index abe5d03c..d6a28180 100644 --- a/zhenxun/configs/utils/__init__.py +++ b/zhenxun/configs/utils/__init__.py @@ -194,6 +194,21 @@ class ConfigsManager: f"**********************************************" ) + def set_name(self, module: str, name: str): + """设置插件配置中文名出 + + 参数: + module: 模块名 + name: 中文名称 + + 异常: + ValueError: module不能为为空 + """ + if not module: + raise ValueError("set_name: module不能为为空") + if data := self._data.get(module): + data.name = name + def add_plugin_config( self, module: str, @@ -219,8 +234,8 @@ class ConfigsManager: _override: 强制覆盖值. 异常: - ValueError: _description_ - ValueError: _description_ + ValueError: module和key不能为为空 + ValueError: 填写错误 """ if not module or not key: @@ -297,7 +312,7 @@ class ConfigsManager: if module in self._data.keys(): config = self._data[module].configs.get(key) if not config: - config = self._data[module].configs.get(f"{key} [LEVEL]") + config = self._data[module].configs.get(key) if not config: raise NoSuchConfig( f"未查询到配置项 MODULE: [ {module} ] | KEY: [ {key} ]" diff --git a/zhenxun/plugins/image_management/__init__.py b/zhenxun/plugins/image_management/__init__.py new file mode 100644 index 00000000..8f24386d --- /dev/null +++ b/zhenxun/plugins/image_management/__init__.py @@ -0,0 +1,69 @@ +from pathlib import Path +from typing import List, Tuple + +import nonebot + +from zhenxun.configs.config import Config +from zhenxun.configs.path_config import IMAGE_PATH + +Config.add_plugin_config( + "image_management", + "IMAGE_DIR_LIST", + ["美图", "萝莉", "壁纸"], + help="公开图库列表,可自定义添加 [如果含有send_setu插件,请不要添加色图库]", + default_value=[], + type=List[str], +) + +Config.add_plugin_config( + "image_management", + "WITHDRAW_IMAGE_MESSAGE", + (0, 1), + help="自动撤回,参1:延迟撤回发送图库图片的时间(秒),0 为关闭 | 参2:监控聊天类型,0(私聊) 1(群聊) 2(群聊+私聊)", + default_value=(0, 1), + type=Tuple[int, int], +) + +Config.add_plugin_config( + "image_management:delete_image", + "DELETE_IMAGE_LEVEL", + 7, + help="删除图库图片需要的管理员等级", + default_value=7, + type=int, +) + +Config.add_plugin_config( + "image_management:move_image", + "MOVE_IMAGE_LEVEL", + 7, + help="移动图库图片需要的管理员等级", + default_value=7, + type=int, +) + +Config.add_plugin_config( + "image_management:upload_image", + "UPLOAD_IMAGE_LEVEL", + 6, + help="上传图库图片需要的管理员等级", + default_value=6, + type=int, +) + +Config.add_plugin_config( + "image_management", + "SHOW_ID", + True, + help="是否消息显示图片下标id", + default_value=True, + type=bool, +) + +Config.set_name("image_management", "图库操作") + + +(IMAGE_PATH / "image_management").mkdir(parents=True, exist_ok=True) + + +nonebot.load_plugins(str(Path(__file__).parent.resolve())) diff --git a/zhenxun/plugins/image_management/_config.py b/zhenxun/plugins/image_management/_config.py new file mode 100644 index 00000000..d5e01f58 --- /dev/null +++ b/zhenxun/plugins/image_management/_config.py @@ -0,0 +1,14 @@ +from strenum import StrEnum + + +class ImageHandleType(StrEnum): + """ + 图片处理类型 + """ + + UPLOAD = "UPLOAD" + """上传""" + DELETE = "DELETE" + """删除""" + MOVE = "MOVE" + """移动""" diff --git a/zhenxun/plugins/image_management/_data_source.py b/zhenxun/plugins/image_management/_data_source.py new file mode 100644 index 00000000..bc26b74f --- /dev/null +++ b/zhenxun/plugins/image_management/_data_source.py @@ -0,0 +1,196 @@ +import os +import random +from pathlib import Path + +import aiofiles + +from zhenxun.configs.path_config import IMAGE_PATH +from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncHttpx +from zhenxun.utils.utils import cn2py + +from .image_management_log import ImageHandleType, ImageManagementLog + +BASE_PATH = IMAGE_PATH / "image_management" + + +class ImageManagementManage: + + @classmethod + async def random_image(cls, name: str, file_id: int | None = None) -> Path | None: + """随机图片 + + 参数: + name: 图库名称 + file_id: 图片id. + + 返回: + Path | None: 图片路径 + """ + path = BASE_PATH / name + file_name = f"{file_id}.jpg" + if file_id is None: + if file_list := os.listdir(path): + file_name = random.choice(file_list) + _file = path / file_name + if not _file.exists(): + return None + return _file + + @classmethod + async def upload_image( + cls, + image_data: bytes | str, + name: str, + user_id: str, + platform: str | None = None, + ) -> str | None: + """上传图片 + + 参数: + image_data: 图片bytes + name: 图库名称 + user_id: 用户id + platform: 所属平台 + + 返回: + str | None: 文件名称 + """ + path = BASE_PATH / cn2py(name) + path.mkdir(exist_ok=True, parents=True) + _file_name = 0 + if file_list := os.listdir(path): + file_list.sort() + _file_name = int(file_list[-1].split(".")[0]) + 1 + _file_path = path / f"{_file_name}.jpg" + try: + await ImageManagementLog.create( + user_id=user_id, + path=_file_path, + handle_type=ImageHandleType.UPLOAD, + platform=platform, + ) + if isinstance(image_data, str): + await AsyncHttpx.download_file(image_data, _file_path) + else: + async with aiofiles.open(_file_path, "wb") as f: + await f.write(image_data) + logger.info( + f"上传图片至 {name}, 路径: {_file_path}", + "上传图片", + session=user_id, + ) + return f"{_file_name}.jpg" + except Exception as e: + logger.error("上传图片错误", "上传图片", e=e) + return None + + @classmethod + async def delete_image( + cls, name: str, file_id: int, user_id: str, platform: str | None = None + ) -> bool: + """删除图片 + + 参数: + name: 图库名称 + file_id: 图片id + user_id: 用户id + platform: 所属平台. + + 返回: + bool: 是否删除成功 + """ + path = BASE_PATH / cn2py(name) + if not path.exists(): + return False + _file_path = path / f"{file_id}.jpg" + if not _file_path.exists(): + return False + try: + await ImageManagementLog.create( + user_id=user_id, + path=_file_path, + handle_type=ImageHandleType.DELETE, + platform=platform, + ) + _file_path.unlink() + logger.info( + f"图库: {name}, 删除图片路径: {_file_path}", "删除图片", session=user_id + ) + if file_list := os.listdir(path): + file_list.sort() + _file_name = file_list[-1].split(".")[0] + _move_file = path / f"{_file_name}.jpg" + _move_file.rename(_file_path) + logger.info( + f"图库: {name}, 移动图片名称: {_file_name}.jpg -> {file_id}.jpg", + "删除图片", + session=user_id, + ) + except Exception as e: + logger.error("删除图片错误", "删除图片", e=e) + return False + return True + + @classmethod + async def move_image( + cls, + a_name: str, + b_name: str, + file_id: int, + user_id: str, + platform: str | None = None, + ) -> str | None: + """移动图片 + + 参数: + a_name: 源图库 + b_name: 模板图库 + file_id: 图片id + user_id: 用户id + platform: 所属平台. + + 返回: + bool: 是否移动成功 + """ + source_path = BASE_PATH / cn2py(a_name) + if not source_path.exists(): + return None + destination_path = BASE_PATH / cn2py(b_name) + destination_path.mkdir(exist_ok=True, parents=True) + source_file = source_path / f"{file_id}.jpg" + if not source_file.exists(): + return None + _destination_name = 0 + if file_list := os.listdir(destination_path): + file_list.sort() + _destination_name = int(file_list[-1].split(".")[0]) + 1 + destination_file = destination_path / f"{_destination_name}.jpg" + try: + await ImageManagementLog.create( + user_id=user_id, + path=source_file, + move=destination_file, + handle_type=ImageHandleType.MOVE, + platform=platform, + ) + source_file.rename(destination_file) + logger.info( + f"图库: {a_name} -> {b_name}, 移动图片路径: {source_file} -> {destination_file}", + "移动图片", + session=user_id, + ) + if file_list := os.listdir(source_path): + file_list.sort() + _file_name = file_list[-1].split(".")[0] + _move_file = source_path / f"{_file_name}.jpg" + _move_file.rename(source_file) + logger.info( + f"图库: {a_name}, 移动图片名称: {_file_name}.jpg -> {file_id}.jpg", + "移动图片", + session=user_id, + ) + except Exception as e: + logger.error("移动图片错误", "移动图片", e=e) + return None + return f"{source_file} -> {destination_file}" diff --git a/zhenxun/plugins/image_management/delete_image.py b/zhenxun/plugins/image_management/delete_image.py new file mode 100644 index 00000000..bccbe3d2 --- /dev/null +++ b/zhenxun/plugins/image_management/delete_image.py @@ -0,0 +1,109 @@ +from nonebot.adapters import Bot +from nonebot.plugin import PluginMetadata +from nonebot.rule import to_me +from nonebot.typing import T_State +from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, UniMessage, on_alconna +from nonebot_plugin_saa import Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import Config +from zhenxun.configs.utils import PluginExtraData +from zhenxun.services.log import logger +from zhenxun.utils.enum import PluginType + +from ._data_source import ImageManagementManage + +base_config = Config.get("image_management") + +__plugin_meta__ = PluginMetadata( + name="删除图片", + description="不好看的图片删掉删掉!", + usage=""" + 指令: + 删除图片 [图库] [id] + 查看图库 + 示例:删除图片 美图 666 + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + plugin_type=PluginType.ADMIN, + admin_level=base_config.get("DELETE_IMAGE_LEVEL"), + ).dict(), +) + + +_matcher = on_alconna( + Alconna("删除图片", Args["name?", str]["index?", str]), + rule=to_me(), + priority=5, + block=True, +) + + +@_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + arparma: Arparma, + name: Match[str], + index: Match[str], + state: T_State, +): + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if not image_dir_list: + await Text("未发现任何图库").finish() + _text = "" + for i, dir in enumerate(image_dir_list): + _text += f"{i}. {dir}\n" + state["dir_list"] = _text[:-1] + if name.available: + _matcher.set_path_arg("name", name.result) + if index.available: + _matcher.set_path_arg("index", index.result) + + +@_matcher.got_path( + "name", + prompt=UniMessage.template( + "请输入要删除的目标图库(id 或 名称)【发送'取消', '算了'来取消操作】\n{dir_list}" + ), +) +async def _(name: str): + if name in ["取消", "算了"]: + await Text("已取消操作...").finish() + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if name.isdigit(): + index = int(name) + if index <= len(image_dir_list) - 1: + name = image_dir_list[index] + if name not in image_dir_list: + await _matcher.reject_path("name", "此目录不正确,请重新输入目录!") + _matcher.set_path_arg("name", name) + + +@_matcher.got_path("index", "请输入要删除的图片id?【发送'取消', '算了'来取消操作】") +async def _( + session: EventSession, + arparma: Arparma, + index: str, +): + if index in ["取消", "算了"]: + await Text("已取消操作...").finish() + if not index.isdigit(): + await _matcher.reject_path("index", "图片id需要输入数字...") + name = _matcher.get_path_arg("name", None) + if not name: + await Text("图库名称为空...").finish() + if not session.id1: + await Text("用户id为空...").finish() + if file_name := await ImageManagementManage.delete_image( + name, int(index), session.id1, session.platform + ): + logger.info( + f"删除图片成功 图库: {name} --- 名称: {file_name}", + arparma.header_result, + session=session, + ) + await Text(f"删除图片成功!\n图库: {name}\n名称: {index}.jpg").finish() + await Text("图片删除失败...").finish() diff --git a/zhenxun/plugins/image_management/image_management_log.py b/zhenxun/plugins/image_management/image_management_log.py new file mode 100644 index 00000000..756e58f2 --- /dev/null +++ b/zhenxun/plugins/image_management/image_management_log.py @@ -0,0 +1,27 @@ +from tortoise import fields + +from zhenxun.services.db_context import Model + +from ._config import ImageHandleType + + +class ImageManagementLog(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + user_id = fields.CharField(255, description="用户id") + """用户id""" + path = fields.TextField(description="图片路径") + """图片路径""" + move = fields.TextField(null=True, description="移动路径") + """移动路径""" + handle_type = fields.CharEnumField(ImageHandleType, description="操作类型") + """操作类型""" + create_time = fields.DatetimeField(auto_now_add=True, description="创建时间") + """创建时间""" + platform = fields.CharField(255, null=True, description="平台") + """平台""" + + class Meta: + table = "image_management_log" + table_description = "画廊操作记录" diff --git a/zhenxun/plugins/image_management/move_image.py b/zhenxun/plugins/image_management/move_image.py new file mode 100644 index 00000000..44157f3e --- /dev/null +++ b/zhenxun/plugins/image_management/move_image.py @@ -0,0 +1,134 @@ +from nonebot.adapters import Bot +from nonebot.plugin import PluginMetadata +from nonebot.rule import to_me +from nonebot.typing import T_State +from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, UniMessage, on_alconna +from nonebot_plugin_saa import Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import Config +from zhenxun.configs.utils import PluginExtraData +from zhenxun.services.log import logger +from zhenxun.utils.enum import PluginType + +from ._data_source import ImageManagementManage + +base_config = Config.get("image_management") + +__plugin_meta__ = PluginMetadata( + name="移动图片", + description="图库间的图片移动操作", + usage=""" + 指令: + 移动图片 [源图库] [目标图库] [id] + 查看图库 + 示例:移动图片 萝莉 美图 234 + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + plugin_type=PluginType.ADMIN, + admin_level=base_config.get("MOVE_IMAGE_LEVEL"), + ).dict(), +) + + +_matcher = on_alconna( + Alconna("移动图片", Args["source?", str]["destination?", str]["index?", str]), + rule=to_me(), + priority=5, + block=True, +) + + +@_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + arparma: Arparma, + source: Match[str], + destination: Match[str], + index: Match[str], + state: T_State, +): + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if not image_dir_list: + await Text("未发现任何图库").finish() + _text = "" + for i, dir in enumerate(image_dir_list): + _text += f"{i}. {dir}\n" + state["dir_list"] = _text[:-1] + if source.available: + _matcher.set_path_arg("source", source.result) + if destination.available: + _matcher.set_path_arg("destination", destination.result) + if index.available: + _matcher.set_path_arg("index", index.result) + + +@_matcher.got_path( + "source", + prompt=UniMessage.template( + "要从哪个图库移出?【发送'取消', '算了'来取消操作】\n{dir_list}" + ), +) +async def _(source: str): + if source in ["取消", "算了"]: + await Text("已取消操作...").finish() + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if source.isdigit(): + index = int(source) + if index <= len(image_dir_list) - 1: + name = image_dir_list[index] + if name not in image_dir_list: + await _matcher.reject_path("source", "此目录不正确,请重新输入目录!") + _matcher.set_path_arg("source", name) + + +@_matcher.got_path( + "destination", + prompt=UniMessage.template( + "要移动到哪个图库?【发送'取消', '算了'来取消操作】\n{dir_list}" + ), +) +async def _(destination: str): + if destination in ["取消", "算了"]: + await Text("已取消操作...").finish() + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if destination.isdigit(): + index = int(destination) + if index <= len(image_dir_list) - 1: + name = image_dir_list[index] + if name not in image_dir_list: + await _matcher.reject_path("destination", "此目录不正确,请重新输入目录!") + _matcher.set_path_arg("destination", name) + + +@_matcher.got_path("index", "要移动的图片id是?【发送'取消', '算了'来取消操作】") +async def _( + session: EventSession, + arparma: Arparma, + index: str, +): + if index in ["取消", "算了"]: + await Text("已取消操作...").finish() + if not index.isdigit(): + await _matcher.reject_path("index", "图片id需要输入数字...") + source = _matcher.get_path_arg("source", None) + destination = _matcher.get_path_arg("destination", None) + if not source: + await Text("图库名称为空...").finish() + if not destination: + await Text("图库名称为空...").finish() + if not session.id1: + await Text("用户id为空...").finish() + if file_name := await ImageManagementManage.move_image( + source, destination, int(index), session.id1, session.platform + ): + logger.info( + f"移动图片成功 图库: {source} -> {destination} --- 名称: {file_name}", + arparma.header_result, + session=session, + ) + await Text(f"移动图片成功!\n图库: {source} -> {destination}").finish() + await Text("图片删除失败...").finish() diff --git a/zhenxun/plugins/image_management/send_image.py b/zhenxun/plugins/image_management/send_image.py new file mode 100644 index 00000000..e69de29b diff --git a/zhenxun/plugins/image_management/upload_image.py b/zhenxun/plugins/image_management/upload_image.py new file mode 100644 index 00000000..6f5a636a --- /dev/null +++ b/zhenxun/plugins/image_management/upload_image.py @@ -0,0 +1,196 @@ +from nonebot.adapters import Bot +from nonebot.params import Arg, ArgStr, CommandArg +from nonebot.plugin import PluginMetadata +from nonebot.rule import to_me +from nonebot.typing import T_State +from nonebot.utils import P +from nonebot_plugin_alconna import Alconna, Args, Arparma +from nonebot_plugin_alconna import Image as alcImage +from nonebot_plugin_alconna import Match, UniMessage, UniMsg, image_fetch, on_alconna +from nonebot_plugin_apscheduler import scheduler +from nonebot_plugin_saa import Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import Config +from zhenxun.configs.utils import PluginExtraData +from zhenxun.services.log import logger +from zhenxun.utils.enum import PluginType +from zhenxun.utils.http_utils import AsyncHttpx + +from ._data_source import ImageManagementManage + +base_config = Config.get("image_management") + +__plugin_meta__ = PluginMetadata( + name="上传图片", + description="上传图片至指定图库", + usage=""" + 指令: + 查看图库 + 上传图片 [图库] [图片] + 示例:上传图片 美图 [图片] + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + plugin_type=PluginType.ADMIN, + admin_level=base_config.get("UPLOAD_IMAGE_LEVEL"), + ).dict(), +) + + +_upload_matcher = on_alconna( + Alconna("上传图片", Args["name?", str]["img?", alcImage]), + rule=to_me(), + priority=5, + block=True, +) + +_continuous_upload_matcher = on_alconna( + Alconna("连续上传图片", Args["name?", str]), + rule=to_me(), + priority=5, + block=True, +) + +_show_matcher = on_alconna(Alconna("查看公开图库"), priority=1, block=True) + + +@_show_matcher.handle() +async def _(): + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if not image_dir_list: + await Text("未发现任何图库").finish() + text = "公开图库列表:\n" + for i, e in enumerate(image_dir_list): + text += f"\t{i+1}.{e}\n" + await Text(text[:-1]).send() + + +@_upload_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + arparma: Arparma, + name: Match[str], + img: Match[bytes], + state: T_State, +): + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if not image_dir_list: + await Text("未发现任何图库").finish() + _text = "" + for i, dir in enumerate(image_dir_list): + _text += f"{i}. {dir}\n" + state["dir_list"] = _text[:-1] + if name.available: + _upload_matcher.set_path_arg("name", name.result) + if img.available: + result = await AsyncHttpx.get(img.result.url) # type: ignore + _upload_matcher.set_path_arg("img", result.content) + + +@_continuous_upload_matcher.handle() +async def _(bot: Bot, state: T_State, name: Match[str]): + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if not image_dir_list: + await Text("未发现任何图库").finish() + _text = "" + for i, dir in enumerate(image_dir_list): + _text += f"{i}. {dir}\n" + state["dir_list"] = _text[:-1] + if name.available: + _upload_matcher.set_path_arg("name", name.result) + + +@_continuous_upload_matcher.got_path( + "name", + prompt=UniMessage.template( + "请选择要上传的图库(id 或 名称)【发送'取消', '算了'来取消操作】\n{dir_list}" + ), +) +@_upload_matcher.got_path( + "name", + prompt=UniMessage.template( + "请选择要上传的图库(id 或 名称)【发送'取消', '算了'来取消操作】\n{dir_list}" + ), +) +async def _(name: str, state: T_State): + if name in ["取消", "算了"]: + await Text("已取消操作...").finish() + image_dir_list = base_config.get("IMAGE_DIR_LIST") + if name.isdigit(): + index = int(name) + if index <= len(image_dir_list) - 1: + name = image_dir_list[index] + if name not in image_dir_list: + await _upload_matcher.reject_path("name", "此目录不正确,请重新输入目录!") + _upload_matcher.set_path_arg("name", name) + + +@_upload_matcher.got_path("img", "图呢图呢图呢图呢!GKD!", image_fetch) +async def _( + bot: Bot, + session: EventSession, + arparma: Arparma, + img: bytes, +): + name = _upload_matcher.get_path_arg("name", None) + if not name: + await Text("图库名称为空...").finish() + if not session.id1: + await Text("用户id为空...").finish() + if file_name := await ImageManagementManage.upload_image( + img, name, session.id1, session.platform + ): + logger.info( + f"图库: {name} --- 名称: {file_name}", + arparma.header_result, + session=session, + ) + await Text(f"上传图片成功!\n图库: {name}\n名称: {file_name}").finish() + await Text("图片上传失败...").finish() + + +@_continuous_upload_matcher.got( + "img", "图呢图呢图呢图呢!GKD!【在最后一张图片中+‘stop’为停止】" +) +async def _( + bot: Bot, + arparma: Arparma, + session: EventSession, + state: T_State, + message: UniMsg, +): + name = _continuous_upload_matcher.get_path_arg("name", None) + if not name: + await Text("图库名称为空...").finish() + if not session.id1: + await Text("用户id为空...").finish() + if not state.get("img_list"): + state["img_list"] = [] + msg = message.extract_plain_text().strip().replace(arparma.header_result, "", 1) + if msg in ["取消", "算了"]: + await Text("已取消操作...").finish() + if msg != "stop": + for msg in message: + if isinstance(msg, alcImage): + state["img_list"].append(msg.url) + await _continuous_upload_matcher.reject("图再来!!【发送‘stop’为停止】") + if state["img_list"]: + await Text("正在下载, 请稍后...").send() + file_list = [] + for img in state["img_list"]: + if file_name := await ImageManagementManage.upload_image( + img, name, session.id1, session.platform + ): + file_list.append(img) + logger.info( + f"图库: {name} --- 名称: {file_name}", + "上传图片", + session=session, + ) + await Text( + f"上传图片成功!共上传了{len(file_list)}张图片\n图库: {name}\n名称: {', '.join(file_list)}" + ).finish() + await Text("图片上传失败...").finish()