新增插件商店api (#1659)

*  新增插件商店api

* chore(version): Update version to v0.2.2-7e15f20

---------

Co-authored-by: HibiKier <HibiKier@users.noreply.github.com>
This commit is contained in:
HibiKier 2024-09-29 20:47:58 +08:00 committed by GitHub
parent e8425f371c
commit d68a4099ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 419 additions and 248 deletions

View File

@ -1 +1 @@
__version__: v0.2.2-e08b388
__version__: v0.2.2-7e15f20

View File

@ -70,7 +70,7 @@ def install_requirement(plugin_path: Path):
class ShopManage:
@classmethod
@cached(60)
async def __get_data(cls) -> dict[str, StorePluginInfo]:
async def get_data(cls) -> dict[str, StorePluginInfo]:
"""获取插件信息数据
异常:
@ -150,7 +150,7 @@ class ShopManage:
返回:
BuildImage | str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.__get_data()
data: dict[str, StorePluginInfo] = await cls.get_data()
column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"]
plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "0.1") for p in plugin_list}
@ -184,7 +184,7 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.__get_data()
data: dict[str, StorePluginInfo] = await cls.get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]
@ -274,7 +274,7 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.__get_data()
data: dict[str, StorePluginInfo] = await cls.get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]
@ -306,7 +306,7 @@ class ShopManage:
返回:
BuildImage | str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.__get_data()
data: dict[str, StorePluginInfo] = await cls.get_data()
plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list}
filtered_data = [
@ -349,7 +349,7 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.__get_data()
data: dict[str, StorePluginInfo] = await cls.get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]

View File

@ -16,14 +16,23 @@ class StorePluginInfo(BaseModel):
"""插件信息"""
module: str
"""模块名"""
module_path: str
"""模块路径"""
description: str
"""简介"""
usage: str
"""用法"""
author: str
"""作者"""
version: str
"""版本"""
plugin_type: PluginType
"""插件类型"""
is_dir: bool
"""是否为文件夹插件"""
github_url: str | None
"""github链接"""
@property
def plugin_type_name(self):

View File

@ -20,8 +20,10 @@ from .api.tabs.manage import router as manage_router
from .api.tabs.system import router as system_router
from .api.tabs.main import ws_router as status_routes
from .api.tabs.database import router as database_router
from .api.tabs.dashboard import router as dashboard_router
from .api.tabs.manage.chat import ws_router as chat_routes
from .api.tabs.plugin_manage import router as plugin_router
from .api.tabs.plugin_manage.store import router as store_router
__plugin_meta__ = PluginMetadata(
name="WebUi",
@ -71,6 +73,8 @@ BaseApiRouter = APIRouter(prefix="/zhenxun/api")
BaseApiRouter.include_router(auth_router)
BaseApiRouter.include_router(store_router)
BaseApiRouter.include_router(dashboard_router)
BaseApiRouter.include_router(main_router)
BaseApiRouter.include_router(manage_router)
BaseApiRouter.include_router(database_router)

View File

@ -1 +1 @@
from .tabs import *
from .tabs import * # noqa: F403

View File

@ -1 +1 @@
from .logs import *
from .logs import * # noqa: F403

View File

@ -1,7 +1,6 @@
import asyncio
from typing import Awaitable, Callable, Generic, TypeVar
PATTERN = r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))"
from typing import Generic, TypeVar
from collections.abc import Callable, Awaitable
_T = TypeVar("_T")
LogListener = Callable[[_T], Awaitable[None]]
@ -22,14 +21,13 @@ class LogStorage(Generic[_T]):
self.logs[seq] = log
asyncio.get_running_loop().call_later(self.rotation, self.remove, seq)
await asyncio.gather(
*map(lambda listener: listener(log), self.listeners),
*(listener(log) for listener in self.listeners),
return_exceptions=True,
)
return seq
def remove(self, seq: int):
del self.logs[seq]
return
LOG_STORAGE: LogStorage[str] = LogStorage[str]()

View File

@ -1,7 +1,7 @@
from fastapi import APIRouter, WebSocket
from loguru import logger
from fastapi import APIRouter
from nonebot.utils import escape_tag
from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState
from starlette.websockets import WebSocket, WebSocketState, WebSocketDisconnect
from .log_manager import LOG_STORAGE
@ -27,4 +27,3 @@ async def system_logs_realtime(websocket: WebSocket):
pass
finally:
LOG_STORAGE.listeners.remove(log_listener)
return

View File

@ -0,0 +1,22 @@
from nonebot import require
from fastapi import APIRouter
from ....base_model import Result
from .data_source import BotManage
from ....utils import authentication
require("plugin_store")
router = APIRouter(prefix="/dashboard")
@router.get(
"/get_bot_list",
dependencies=[authentication()],
deprecated="获取bot列表", # type: ignore
)
async def _() -> Result:
try:
return Result.ok(await BotManage.get_bot_list(), "拿到信息啦!")
except Exception as e:
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")

View File

@ -0,0 +1,81 @@
import time
from datetime import datetime, timedelta
import nonebot
from nonebot.adapters import Bot
from nonebot.drivers import Driver
from zhenxun.models.statistics import Statistics
from zhenxun.utils.platform import PlatformUtils
from zhenxun.models.chat_history import ChatHistory
from .model import BotInfo
from ..main.data_source import bot_live
driver: Driver = nonebot.get_driver()
CONNECT_TIME = 0
@driver.on_startup
async def _():
global CONNECT_TIME
CONNECT_TIME = int(time.time())
class BotManage:
@classmethod
async def __build_bot_info(cls, bot: Bot) -> BotInfo:
"""构建Bot信息
参数:
bot: Bot
返回:
BotInfo: Bot信息
"""
now = datetime.now()
platform = PlatformUtils.get_platform(bot) or ""
if platform == "qq":
login_info = await bot.get_login_info()
nickname = login_info["nickname"]
ava_url = PlatformUtils.get_user_avatar_url(bot.self_id, "qq") or ""
else:
nickname = bot.self_id
ava_url = ""
bot_info = BotInfo(
self_id=bot.self_id, nickname=nickname, ava_url=ava_url, platform=platform
)
group_list, _ = await PlatformUtils.get_group_list(bot)
group_list = [g for g in group_list if g.channel_id is None]
friend_list = await PlatformUtils.get_friend_list(bot)
bot_info.group_count = len(group_list)
bot_info.friend_count = len(friend_list)
bot_info.day_call = await Statistics.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
bot_info.received_messages = await ChatHistory.filter(
bot_id=bot_info.self_id,
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute),
).count()
bot_info.connect_time = bot_live.get(bot.self_id) or 0
if bot_info.connect_time:
connect_date = datetime.fromtimestamp(CONNECT_TIME)
connect_date_str = connect_date.strftime("%Y-%m-%d %H:%M:%S")
bot_info.connect_date = datetime.strptime(
connect_date_str, "%Y-%m-%d %H:%M:%S"
)
return bot_info
@classmethod
async def get_bot_list(cls) -> list[BotInfo]:
"""获取bot列表
返回:
list[BotInfo]: Bot列表
"""
bot_list: list[BotInfo] = []
for _, bot in nonebot.get_bots().items():
bot_list.append(await cls.__build_bot_info(bot))
return bot_list

View File

@ -0,0 +1,26 @@
from datetime import datetime
from pydantic import BaseModel
class BotInfo(BaseModel):
self_id: str
"""SELF ID"""
nickname: str
"""昵称"""
ava_url: str
"""头像url"""
platform: str
"""平台"""
friend_count: int = 0
"""好友数量"""
group_count: int = 0
"""群聊数量"""
received_messages: int = 0
"""今日消息接收"""
day_call: int = 0
"""今日调用插件次数"""
connect_time: int = 0
"""连接时间"""
connect_date: datetime | None = None
"""连接日期"""

View File

@ -4,7 +4,6 @@ from zhenxun.services.db_context import Model
class SqlLog(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
ip = fields.CharField(255)
@ -18,7 +17,7 @@ class SqlLog(Model):
create_time = fields.DatetimeField(auto_now_add=True)
"""创建时间"""
class Meta:
class Meta: # type: ignore
table = "sql_log"
table_description = "sql执行日志"

View File

@ -1,26 +1,27 @@
import asyncio
import time
from datetime import datetime, timedelta
import asyncio
import contextlib
from pathlib import Path
from datetime import datetime, timedelta
import nonebot
from fastapi import APIRouter, WebSocket
from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState
from fastapi import APIRouter
from tortoise.functions import Count
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
from starlette.websockets import WebSocket, WebSocketState, WebSocketDisconnect
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.group_info import GroupInfo
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.statistics import Statistics
from zhenxun.services.log import logger
from zhenxun.models.group_info import GroupInfo
from zhenxun.models.statistics import Statistics
from zhenxun.utils.platform import PlatformUtils
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.chat_history import ChatHistory
from ....base_model import Result
from ....config import AVA_URL, GROUP_AVA_URL, QueryDateType
from ....utils import authentication, get_system_status
from .data_source import bot_live
from .model import ActiveGroup, BaseInfo, ChatHistoryCount, HotPlugin
from ....utils import authentication, get_system_status
from ....config import AVA_URL, GROUP_AVA_URL, QueryDateType
from .model import BaseInfo, HotPlugin, ActiveGroup, ChatHistoryCount
run_time = time.time()
@ -131,7 +132,7 @@ async def _(bot_id: str) -> Result:
"/get_ch_count", dependencies=[authentication()], description="获取接收消息数量"
)
async def _(bot_id: str, query_type: QueryDateType | None = None) -> Result:
if bots := nonebot.get_bots():
if nonebot.get_bot(bot_id):
if not query_type:
return Result.ok(await ChatHistory.filter(bot_id=bot_id).count())
now = datetime.now()
@ -210,7 +211,6 @@ async def _(date_type: QueryDateType | None = None) -> Result:
.limit(5)
.values_list("group_id", "count")
)
active_group_list = []
id2name = {}
if data_list:
if info_list := await GroupInfo.filter(
@ -218,15 +218,15 @@ async def _(date_type: QueryDateType | None = None) -> Result:
).all():
for group_info in info_list:
id2name[group_info.group_id] = group_info.group_name
for data in data_list:
active_group_list.append(
active_group_list = [
ActiveGroup(
group_id=data[0],
name=id2name.get(data[0]) or data[0],
chat_num=data[1],
ava_img=GROUP_AVA_URL.format(data[0], data[0]),
)
)
for data in data_list
]
active_group_list = sorted(
active_group_list, key=lambda x: x.chat_num, reverse=True
)
@ -263,13 +263,7 @@ async def _(date_type: QueryDateType | None = None) -> Result:
for data in data_list:
module = data[0]
name = module2name.get(module) or module
hot_plugin_list.append(
HotPlugin(
module=data[0],
name=name,
count=data[1],
)
)
hot_plugin_list.append(HotPlugin(module=module, name=name, count=data[1]))
hot_plugin_list = sorted(hot_plugin_list, key=lambda x: x.count, reverse=True)
if len(hot_plugin_list) > 5:
hot_plugin_list = hot_plugin_list[:5]
@ -280,11 +274,11 @@ async def _(date_type: QueryDateType | None = None) -> Result:
async def system_logs_realtime(websocket: WebSocket, sleep: int = 5):
await websocket.accept()
logger.debug("ws system_status is connect")
try:
with contextlib.suppress(
WebSocketDisconnect, ConnectionClosedError, ConnectionClosedOK
):
while websocket.client_state == WebSocketState.CONNECTED:
system_status = await get_system_status()
await websocket.send_text(system_status.json())
await asyncio.sleep(sleep)
except (WebSocketDisconnect, ConnectionClosedError, ConnectionClosedOK):
pass
return

View File

@ -1,40 +1,40 @@
import nonebot
from fastapi import APIRouter
from nonebot.adapters.onebot.v11 import ActionFailed
from tortoise.functions import Count
from nonebot.adapters.onebot.v11 import ActionFailed
from zhenxun.configs.config import BotConfig
from zhenxun.models.ban_console import BanConsole
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.fg_request import FgRequest
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.statistics import Statistics
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
from zhenxun.utils.enum import RequestHandleType, RequestType
from zhenxun.utils.exception import NotFoundError
from zhenxun.configs.config import BotConfig
from zhenxun.models.task_info import TaskInfo
from zhenxun.models.fg_request import FgRequest
from zhenxun.models.statistics import Statistics
from zhenxun.utils.platform import PlatformUtils
from zhenxun.models.ban_console import BanConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.exception import NotFoundError
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.group_console import GroupConsole
from zhenxun.utils.enum import RequestType, RequestHandleType
from ....base_model import Result
from ....config import AVA_URL, GROUP_AVA_URL
from ....utils import authentication
from ....config import AVA_URL, GROUP_AVA_URL
from .model import (
ClearRequest,
DeleteFriend,
Task,
Friend,
FriendRequestResult,
GroupDetail,
GroupRequestResult,
GroupResult,
HandleRequest,
LeaveGroup,
Plugin,
ReqResult,
SendMessage,
Task,
UpdateGroup,
LeaveGroup,
UserDetail,
GroupDetail,
GroupResult,
SendMessage,
UpdateGroup,
ClearRequest,
DeleteFriend,
HandleRequest,
GroupRequestResult,
FriendRequestResult,
)
router = APIRouter(prefix="/manage")
@ -47,7 +47,8 @@ async def _(bot_id: str) -> Result:
"""
获取群信息
"""
if bots := nonebot.get_bots():
if not (bots := nonebot.get_bots()):
return Result.warning_("无Bot连接...")
if bot_id not in bots:
return Result.warning_("指定Bot未连接...")
group_list_result = []
@ -61,7 +62,6 @@ async def _(bot_id: str) -> Result:
logger.error("调用API错误", "/get_group_list", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(group_list_result, "拿到了新鲜出炉的数据!")
return Result.warning_("无Bot连接...")
@router.post(
@ -77,12 +77,8 @@ async def _(group: UpdateGroup) -> Result:
if group.close_plugins:
db_group.block_plugin = ",".join(group.close_plugins) + ","
if group.task:
block_task = []
for t in task_list:
if t not in group.task:
block_task.append(t)
if block_task:
db_group.block_task = ",".join(block_task) + ","
if block_task := [t for t in task_list if t not in group.task]:
db_group.block_task = ",".join(block_task) + "," # type: ignore
await db_group.save(
update_fields=["level", "status", "block_plugin", "block_task"]
)
@ -199,7 +195,7 @@ async def _(parma: HandleRequest) -> Result:
return Result.warning_("指定Bot未连接...")
try:
await FgRequest.refused(bots[bot_id], parma.id)
except ActionFailed as e:
except ActionFailed:
await FgRequest.expire(parma.id)
return Result.warning_("请求失败,可能该请求已失效或请求数据错误...")
except NotFoundError:
@ -226,7 +222,8 @@ async def _(parma: HandleRequest) -> Result:
bot_id = parma.bot_id
if bot_id not in nonebot.get_bots():
return Result.warning_("指定Bot未连接...")
if req := await FgRequest.get_or_none(id=parma.id):
if not (req := await FgRequest.get_or_none(id=parma.id)):
return Result.warning_("未找到此Id请求...")
if req.request_type == RequestType.GROUP:
if group := await GroupConsole.get_group(group_id=req.group_id):
group.group_flag = 1
@ -236,12 +233,10 @@ async def _(parma: HandleRequest) -> Result:
group_id=req.group_id,
defaults={"group_flag": 1},
)
else:
return Result.warning_("未找到此Id请求...")
try:
await FgRequest.approve(bots[bot_id], parma.id)
return Result.ok(info="成功处理了请求!")
except ActionFailed as e:
except ActionFailed:
await FgRequest.expire(parma.id)
return Result.warning_("请求失败,可能该请求已失效或请求数据错误...")
return Result.warning_("无Bot连接...")
@ -335,8 +330,10 @@ async def _(bot_id: str, user_id: str) -> Result:
"/get_group_detail", dependencies=[authentication()], description="获取群组详情"
)
async def _(bot_id: str, group_id: str) -> Result:
if bots := nonebot.get_bots():
if bot_id in bots:
if not (bots := nonebot.get_bots()):
return Result.warning_("无Bot连接...")
if bot_id not in bots:
return Result.warning_("未添加指定群组...")
group = await GroupConsole.get_or_none(group_id=group_id)
if not group:
return Result.warning_("指定群组未被收录...")
@ -403,16 +400,14 @@ async def _(bot_id: str, group_id: str) -> Result:
task=task_list,
)
return Result.ok(group_detail)
else:
return Result.warning_("未添加指定群组...")
return Result.warning_("无Bot连接...")
@router.post(
"/send_message", dependencies=[authentication()], description="获取群组详情"
)
async def _(param: SendMessage) -> Result:
if bots := nonebot.get_bots():
if not (bots := nonebot.get_bots()):
return Result.warning_("无Bot连接...")
if param.bot_id in bots:
platform = PlatformUtils.get_platform(bots[param.bot_id])
if platform != "qq":
@ -430,4 +425,3 @@ async def _(param: SendMessage) -> Result:
return Result.fail(str(e))
return Result.ok("发送成功!")
return Result.warning_("指定Bot未连接...")
return Result.warning_("无Bot连接...")

View File

@ -1,5 +1,3 @@
from typing import Literal
from pydantic import BaseModel
from zhenxun.utils.enum import RequestType
@ -232,7 +230,6 @@ class GroupDetail(BaseModel):
class MessageItem(BaseModel):
type: str
"""消息类型"""
msg: str

View File

@ -1,20 +1,20 @@
import re
import cattrs
from fastapi import APIRouter, Query
from fastapi import Query, APIRouter
from zhenxun.configs.config import Config
from zhenxun.models.plugin_info import PluginInfo as DbPluginInfo
from zhenxun.services.log import logger
from zhenxun.configs.config import Config
from zhenxun.utils.enum import BlockType, PluginType
from zhenxun.models.plugin_info import PluginInfo as DbPluginInfo
from ....base_model import Result
from ....utils import authentication
from .model import (
PluginConfig,
PluginCount,
PluginDetail,
PluginInfo,
PluginCount,
PluginConfig,
PluginDetail,
PluginSwitch,
UpdatePlugin,
)
@ -23,7 +23,9 @@ router = APIRouter(prefix="/plugin")
@router.get(
"/get_plugin_list", dependencies=[authentication()], deprecated="获取插件列表" # type: ignore
"/get_plugin_list",
dependencies=[authentication()],
deprecated="获取插件列表", # type: ignore
)
async def _(
plugin_type: list[PluginType] = Query(None), menu_type: str | None = None
@ -57,7 +59,9 @@ async def _(
@router.get(
"/get_plugin_count", dependencies=[authentication()], deprecated="获取插件数量" # type: ignore
"/get_plugin_count",
dependencies=[authentication()],
deprecated="获取插件数量", # type: ignore
)
async def _() -> Result:
plugin_count = PluginCount()
@ -93,10 +97,7 @@ async def _(plugin: UpdatePlugin) -> Result:
db_plugin.level = plugin.level
db_plugin.menu_type = plugin.menu_type
db_plugin.block_type = plugin.block_type
if plugin.block_type == BlockType.ALL:
db_plugin.status = False
else:
db_plugin.status = True
db_plugin.status = plugin.block_type != BlockType.ALL
await db_plugin.save()
# 配置项
if plugin.configs and (configs := Config.get(plugin.module)):
@ -149,17 +150,13 @@ async def _(module: str) -> Result:
for cfg in config.configs:
type_str = ""
type_inner = None
x = str(config.configs[cfg].type)
r = re.search(r"<class '(.*)'>", str(config.configs[cfg].type))
if r:
type_str = r.group(1)
else:
r = re.search(r"typing\.(.*)\[(.*)\]", str(config.configs[cfg].type))
if r:
type_str = r.group(1)
if r := re.search(r"<class '(.*)'>", str(config.configs[cfg].type)):
type_str = r[1]
elif r := re.search(r"typing\.(.*)\[(.*)\]", str(config.configs[cfg].type)):
type_str = r[1]
if type_str:
type_str = type_str.lower()
type_inner = r.group(2)
type_inner = r[2]
if type_inner:
type_inner = [x.strip() for x in type_inner.split(",")]
config_list.append(

View File

@ -123,3 +123,8 @@ class PluginDetail(PluginInfo):
"""
config_list: list[PluginConfig]
class PluginIr(BaseModel):
id: int
"""插件id"""

View File

@ -0,0 +1,50 @@
from nonebot import require
from fastapi import APIRouter
from .model import PluginIr
from ....base_model import Result
from ....utils import authentication
require("plugin_store")
from zhenxun.builtin_plugins.plugin_store import ShopManage
router = APIRouter(prefix="/store")
@router.get(
"/get_plugin_store",
dependencies=[authentication()],
deprecated="获取插件商店插件信息", # type: ignore
)
async def _() -> Result:
try:
data = await ShopManage.get_data()
return Result.ok(data)
except Exception as e:
return Result.fail(f"获取插件商店插件信息失败: {type(e)}: {e}")
@router.post(
"/install_plugin",
dependencies=[authentication()],
deprecated="安装插件", # type: ignore
)
async def _(param: PluginIr) -> Result:
try:
result = await ShopManage.add_plugin(param.id) # type: ignore
return Result.ok(result)
except Exception as e:
return Result.fail(f"安装插件失败: {type(e)}: {e}")
@router.post(
"/remove_plugin",
dependencies=[authentication()],
deprecated="移除插件", # type: ignore
)
async def _(param: PluginIr) -> Result:
try:
result = await ShopManage.remove_plugin(param.id) # type: ignore
return Result.ok(result)
except Exception as e:
return Result.fail(f"移除插件失败: {type(e)}: {e}")

View File

@ -1,15 +1,15 @@
import os
import shutil
from pathlib import Path
from typing import List, Optional
import aiofiles
from fastapi import APIRouter
from zhenxun.utils._build_image import BuildImage
from ....base_model import Result
from ....utils import authentication, get_system_disk
from .model import AddFile, DeleteFile, DirFile, RenameFile, SaveFile
from .model import AddFile, DirFile, SaveFile, DeleteFile, RenameFile
router = APIRouter(prefix="/system")
@ -19,16 +19,12 @@ IMAGE_TYPE = ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"]
@router.get(
"/get_dir_list", dependencies=[authentication()], description="获取文件列表"
)
async def _(path: Optional[str] = None) -> Result:
async def _(path: str | None = None) -> Result:
base_path = Path(path) if path else Path()
data_list = []
for file in os.listdir(base_path):
file_path = base_path / file
is_image = False
for t in IMAGE_TYPE:
if file.endswith(f".{t}"):
is_image = True
break
is_image = any(file.endswith(f".{t}") for t in IMAGE_TYPE)
data_list.append(
DirFile(
is_file=not file_path.is_dir(),
@ -43,7 +39,7 @@ async def _(path: Optional[str] = None) -> Result:
@router.get(
"/get_resources_size", dependencies=[authentication()], description="获取文件列表"
)
async def _(full_path: Optional[str] = None) -> Result:
async def _(full_path: str | None = None) -> Result:
return Result.ok(await get_system_disk(full_path))
@ -56,7 +52,7 @@ async def _(param: DeleteFile) -> Result:
path.unlink()
return Result.ok("删除成功!")
except Exception as e:
return Result.warning_("删除失败: " + str(e))
return Result.warning_(f"删除失败: {e!s}")
@router.post(
@ -70,7 +66,7 @@ async def _(param: DeleteFile) -> Result:
shutil.rmtree(path.absolute())
return Result.ok("删除成功!")
except Exception as e:
return Result.warning_("删除失败: " + str(e))
return Result.warning_(f"删除失败: {e!s}")
@router.post("/rename_file", dependencies=[authentication()], description="重命名文件")
@ -84,7 +80,7 @@ async def _(param: RenameFile) -> Result:
path.rename(path.parent / param.name)
return Result.ok("重命名成功!")
except Exception as e:
return Result.warning_("重命名失败: " + str(e))
return Result.warning_(f"重命名失败: {e!s}")
@router.post(
@ -101,7 +97,7 @@ async def _(param: RenameFile) -> Result:
shutil.move(path.absolute(), new_path.absolute())
return Result.ok("重命名成功!")
except Exception as e:
return Result.warning_("重命名失败: " + str(e))
return Result.warning_(f"重命名失败: {e!s}")
@router.post("/add_file", dependencies=[authentication()], description="新建文件")
@ -113,7 +109,7 @@ async def _(param: AddFile) -> Result:
path.open("w")
return Result.ok("新建文件成功!")
except Exception as e:
return Result.warning_("新建文件失败: " + str(e))
return Result.warning_(f"新建文件失败: {e!s}")
@router.post("/add_folder", dependencies=[authentication()], description="新建文件夹")
@ -125,7 +121,7 @@ async def _(param: AddFile) -> Result:
path.mkdir()
return Result.ok("新建文件夹成功!")
except Exception as e:
return Result.warning_("新建文件夹失败: " + str(e))
return Result.warning_(f"新建文件夹失败: {e!s}")
@router.get("/read_file", dependencies=[authentication()], description="读取文件")
@ -137,18 +133,18 @@ async def _(full_path: str) -> Result:
text = path.read_text(encoding="utf-8")
return Result.ok(text)
except Exception as e:
return Result.warning_("读取文件失败: " + str(e))
return Result.warning_(f"读取文件失败: {e!s}")
@router.post("/save_file", dependencies=[authentication()], description="读取文件")
async def _(param: SaveFile) -> Result:
path = Path(param.full_path)
try:
with path.open("w") as f:
f.write(param.content)
async with aiofiles.open(path, "w", encoding="utf-8") as f:
await f.write(param.content)
return Result.ok("更新成功!")
except Exception as e:
return Result.warning_("保存文件失败: " + str(e))
return Result.warning_(f"保存文件失败: {e!s}")
@router.get("/get_image", dependencies=[authentication()], description="读取图片base64")
@ -159,4 +155,4 @@ async def _(full_path: str) -> Result:
try:
return Result.ok(BuildImage.open(path).pic2bs4())
except Exception as e:
return Result.warning_("获取图片失败: " + str(e))
return Result.warning_(f"获取图片失败: {e!s}")

View File

@ -1,8 +1,8 @@
from datetime import datetime
from typing import Any, Generic, Optional, TypeVar
from typing_extensions import Self
from typing import Any, Generic, TypeVar
from pydantic import BaseModel, validator
from typing_extensions import Self
T = TypeVar("T")
@ -28,7 +28,7 @@ class Result(BaseModel):
"""code"""
info: str = "操作成功"
"""info"""
warning: Optional[str] = None
warning: str | None = None
"""警告信息"""
data: Any = None
"""返回数据"""
@ -102,7 +102,7 @@ class SystemFolderSize(BaseModel):
"""名称"""
size: float
"""大小"""
full_path: Optional[str]
full_path: str | None
"""完整路径"""
is_dir: bool
"""是否为文件夹"""