添加更新好友群组与修复其他

This commit is contained in:
HibiKier 2024-08-12 22:26:13 +08:00
parent 66efc0cc72
commit 18f97b9bd5
14 changed files with 384 additions and 202 deletions

View File

@ -1,8 +1,10 @@
import nonebot
from nonebot import on_notice from nonebot import on_notice
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import GroupIncreaseNoticeEvent from nonebot.adapters.onebot.v11 import GroupIncreaseNoticeEvent
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Arparma, on_alconna from nonebot_plugin_alconna import Alconna, Arparma, on_alconna
from nonebot_plugin_apscheduler import scheduler
from nonebot_plugin_session import EventSession from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME from zhenxun.configs.config import NICKNAME
@ -10,6 +12,7 @@ from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.rules import admin_check, ensure_group from zhenxun.utils.rules import admin_check, ensure_group
from ._data_source import MemberUpdateManage from ._data_source import MemberUpdateManage
@ -39,6 +42,9 @@ _matcher = on_alconna(
) )
_notice = on_notice(priority=1, block=False)
@_matcher.handle() @_matcher.handle()
async def _(bot: Bot, session: EventSession, arparma: Arparma): async def _(bot: Bot, session: EventSession, arparma: Arparma):
if gid := session.id3 or session.id2: if gid := session.id3 or session.id2:
@ -50,9 +56,6 @@ async def _(bot: Bot, session: EventSession, arparma: Arparma):
await MessageUtils.build_message("群组id为空...").send() await MessageUtils.build_message("群组id为空...").send()
_notice = on_notice(priority=1, block=False)
@_notice.handle() @_notice.handle()
async def _(bot: Bot, event: GroupIncreaseNoticeEvent): async def _(bot: Bot, event: GroupIncreaseNoticeEvent):
# TODO: 其他适配器的加群自动更新群组成员信息 # TODO: 其他适配器的加群自动更新群组成员信息
@ -64,3 +67,28 @@ async def _(bot: Bot, event: GroupIncreaseNoticeEvent):
session=event.user_id, session=event.user_id,
group_id=event.group_id, group_id=event.group_id,
) )
@scheduler.scheduled_job(
"interval",
minutes=5,
)
async def _():
for bot in nonebot.get_bots().values():
if PlatformUtils.get_platform(bot) == "qq":
try:
group_list, _ = await PlatformUtils.get_group_list(bot)
if group_list:
for group in group_list:
try:
await MemberUpdateManage.update(bot, group.group_id)
logger.debug("自动更新群组成员信息成功...")
except Exception as e:
logger.error(
f"Bot: {bot.self_id} 自动更新群组成员信息成功",
target=group.group_id,
e=e,
)
except Exception as e:
logger.error(f"Bot: {bot.self_id} 自动更新群组信息", e=e)
logger.debug(f"自动 Bot: {bot.self_id} 更新群组成员信息成功...")

View File

@ -236,6 +236,9 @@ class AuthChecker:
LimitManage.unblock( LimitManage.unblock(
matcher.plugin.name, user_id, group_id, channel_id matcher.plugin.name, user_id, group_id, channel_id
) )
except AssertionError as e:
is_ignore = True
logger.debug("消息无法发送", session=session, e=e)
if cost_gold and user_id: if cost_gold and user_id:
"""花费金币""" """花费金币"""
try: try:

View File

@ -23,13 +23,15 @@ async def _(bot: Bot):
if PlatformUtils.get_platform(bot) == "qq": if PlatformUtils.get_platform(bot) == "qq":
logger.debug(f"更新Bot: {bot.self_id} 的群认证...") logger.debug(f"更新Bot: {bot.self_id} 的群认证...")
group_list, _ = await PlatformUtils.get_group_list(bot) group_list, _ = await PlatformUtils.get_group_list(bot)
gid_list = [g.group_id for g in group_list] gid_list = [(g.group_id, g.group_name) for g in group_list]
db_group_list = await GroupConsole.all().values_list("group_id", flat=True) db_group_list = await GroupConsole.all().values_list("group_id", flat=True)
create_list = [] create_list = []
update_id = [] update_id = []
for gid in gid_list: for gid, name in gid_list:
if gid not in db_group_list: if gid not in db_group_list:
create_list.append(GroupConsole(group_id=gid, group_flag=1)) create_list.append(
GroupConsole(group_id=gid, group_name=name, group_flag=1)
)
else: else:
update_id.append(gid) update_id.append(gid)
if create_list: if create_list:

View File

@ -17,7 +17,7 @@ async def _():
try: try:
await PlatformUtils.update_group(bot) await PlatformUtils.update_group(bot)
except Exception as e: except Exception as e:
logger.error(f"Bot: {bot.self_id} 自动更新群组信息", e=e) logger.error(f"Bot: {bot.self_id} 自动更新群组信息", "自动更新群组", e=e)
logger.info("自动更新群组成员信息成功...") logger.info("自动更新群组成员信息成功...")
@ -33,5 +33,7 @@ async def _():
try: try:
await PlatformUtils.update_friend(bot) await PlatformUtils.update_friend(bot)
except Exception as e: except Exception as e:
logger.error(f"自动更新好友信息错误", "自动更新好友", e=e) logger.error(
f"Bot: {bot.self_id} 自动更新好友信息错误", "自动更新好友", e=e
)
logger.info("自动更新好友信息成功...") logger.info("自动更新好友信息成功...")

View File

@ -3,9 +3,7 @@ from io import BytesIO
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args from nonebot_plugin_alconna import Alconna, Args, At, Match, on_alconna
from nonebot_plugin_alconna import At as alcAt
from nonebot_plugin_alconna import Match, on_alconna
from nonebot_plugin_session import EventSession from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData from zhenxun.configs.utils import PluginExtraData
@ -29,7 +27,7 @@ __plugin_meta__ = PluginMetadata(
) )
_matcher = on_alconna( _matcher = on_alconna(
Alconna("one-friend", Args["text", str]["at?", alcAt]), priority=5, block=True Alconna("one-friend", Args["text", str]["at?", At]), priority=5, block=True
) )
_matcher.shortcut( _matcher.shortcut(
@ -41,7 +39,7 @@ _matcher.shortcut(
@_matcher.handle() @_matcher.handle()
async def _(bot: Bot, text: str, at: Match[alcAt], session: EventSession): async def _(bot: Bot, text: str, at: Match[At], session: EventSession):
gid = session.id3 or session.id2 gid = session.id3 or session.id2
if not gid: if not gid:
await MessageUtils.build_message("群组id为空...").finish() await MessageUtils.build_message("群组id为空...").finish()

View File

@ -115,27 +115,28 @@ async def _(session: EventSession, message: UniMsg):
if get_url: if get_url:
# 将链接统一发送给处理函数 # 将链接统一发送给处理函数
vd_info, live_info, vd_url, live_url, image_info, image_url = ( data = await parse_bili_url(get_url, information_container)
await parse_bili_url(get_url, information_container) if data.vd_info:
)
if vd_info:
# 判断一定时间内是否解析重复内容,或者是第一次解析 # 判断一定时间内是否解析重复内容,或者是第一次解析
if ( if (
vd_url in _tmp.keys() and time.time() - _tmp[vd_url] > repet_second data.vd_url in _tmp.keys()
) or vd_url not in _tmp.keys(): and time.time() - _tmp[data.vd_url] > repet_second
pic = vd_info.get("pic", "") # 封面 ) or data.vd_url not in _tmp.keys():
aid = vd_info.get("aid", "") # av号 pic = data.vd_info.get("pic", "") # 封面
title = vd_info.get("title", "") # 标题 aid = data.vd_info.get("aid", "") # av号
author = vd_info.get("owner", {}).get("name", "") # UP主 title = data.vd_info.get("title", "") # 标题
reply = vd_info.get("stat", {}).get("reply", "") # 回复 author = data.vd_info.get("owner", {}).get("name", "") # UP主
favorite = vd_info.get("stat", {}).get("favorite", "") # 收藏 reply = data.vd_info.get("stat", {}).get("reply", "") # 回复
coin = vd_info.get("stat", {}).get("coin", "") # 投币 favorite = data.vd_info.get("stat", {}).get("favorite", "") # 收藏
like = vd_info.get("stat", {}).get("like", "") # 点赞 coin = data.vd_info.get("stat", {}).get("coin", "") # 投币
danmuku = vd_info.get("stat", {}).get("danmaku", "") # 弹幕 like = data.vd_info.get("stat", {}).get("like", "") # 点赞
ctime = vd_info["ctime"] danmuku = data.vd_info.get("stat", {}).get("danmaku", "") # 弹幕
ctime = data.vd_info["ctime"]
date = time.strftime("%Y-%m-%d", time.localtime(ctime)) date = time.strftime("%Y-%m-%d", time.localtime(ctime))
logger.info(f"解析bilibili转发 {vd_url}", "b站解析", session=session) logger.info(
_tmp[vd_url] = time.time() f"解析bilibili转发 {data.vd_url}", "b站解析", session=session
)
_tmp[data.vd_url] = time.time()
_path = TEMP_PATH / f"{aid}.jpg" _path = TEMP_PATH / f"{aid}.jpg"
await AsyncHttpx.download_file(pic, _path) await AsyncHttpx.download_file(pic, _path)
await MessageUtils.build_message( await MessageUtils.build_message(
@ -145,33 +146,40 @@ async def _(session: EventSession, message: UniMsg):
] ]
).send() ).send()
elif live_info: elif data.live_info:
if ( if (
live_url in _tmp.keys() and time.time() - _tmp[live_url] > repet_second data.live_url in _tmp.keys()
) or live_url not in _tmp.keys(): and time.time() - _tmp[data.live_url] > repet_second
uid = live_info.get("uid", "") # 主播uid ) or data.live_url not in _tmp.keys():
title = live_info.get("title", "") # 直播间标题 uid = data.live_info.get("uid", "") # 主播uid
description = live_info.get("description", "") # 简介,可能会出现标签 title = data.live_info.get("title", "") # 直播间标题
user_cover = live_info.get("user_cover", "") # 封面 description = data.live_info.get(
keyframe = live_info.get("keyframe", "") # 关键帧画面 "description", ""
live_time = live_info.get("live_time", "") # 开播时间 ) # 简介,可能会出现标签
area_name = live_info.get("area_name", "") # 分区 user_cover = data.live_info.get("user_cover", "") # 封面
parent_area_name = live_info.get("parent_area_name", "") # 父分区 keyframe = data.live_info.get("keyframe", "") # 关键帧画面
logger.info(f"解析bilibili转发 {live_url}", "b站解析", session=session) live_time = data.live_info.get("live_time", "") # 开播时间
_tmp[live_url] = time.time() area_name = data.live_info.get("area_name", "") # 分区
parent_area_name = data.live_info.get("parent_area_name", "") # 父分区
logger.info(
f"解析bilibili转发 {data.live_url}", "b站解析", session=session
)
_tmp[data.live_url] = time.time()
await MessageUtils.build_message( await MessageUtils.build_message(
[ [
Image(url=user_cover), Image(url=user_cover),
f"开播用户https://space.bilibili.com/{uid}\n开播时间:{live_time}\n直播分区:{parent_area_name}——>{area_name}\n标题:{title}\n简介:{description}\n直播截图:\n", f"开播用户https://space.bilibili.com/{uid}\n开播时间:{live_time}\n直播分区:{parent_area_name}——>{area_name}\n标题:{title}\n简介:{description}\n直播截图:\n",
Image(url=keyframe), Image(url=keyframe),
f"{live_url}", f"{data.live_url}",
] ]
).send() ).send()
elif image_info: elif data.image_info:
if ( if (
image_url in _tmp.keys() data.image_url in _tmp.keys()
and time.time() - _tmp[image_url] > repet_second and time.time() - _tmp[data.image_url] > repet_second
) or image_url not in _tmp.keys(): ) or data.image_url not in _tmp.keys():
logger.info(f"解析bilibili转发 {image_url}", "b站解析", session=session) logger.info(
_tmp[image_url] = time.time() f"解析bilibili转发 {data.image_url}", "b站解析", session=session
await image_info.send() )
_tmp[data.image_url] = time.time()
await data.image_info.send()

View File

@ -1,5 +1,6 @@
import os import os
import re import re
from pathlib import Path
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
@ -11,14 +12,14 @@ from zhenxun.utils.message import MessageUtils
from zhenxun.utils.user_agent import get_user_agent_str from zhenxun.utils.user_agent import get_user_agent_str
async def resize(path: str): async def resize(path: Path):
"""调整图像大小的异步函数 """调整图像大小的异步函数
参数: 参数:
path (str): 图像文件路径 path (str): 图像文件路径
""" """
A = BuildImage(background=path) A = BuildImage.open(path)
await A.resize(0.5) await A.resize(0.8)
await A.save(path) await A.save(path)
@ -56,21 +57,18 @@ async def get_image(url) -> UniMessage | None:
# 根据编号构建保存路径 # 根据编号构建保存路径
if cv_number: if cv_number:
screenshot_path = f"{TEMP_PATH}/bilibili_cv_{cv_number}.png" screenshot_path = TEMP_PATH / "bilibili_cv_{cv_number}.png"
elif opus_number: elif opus_number:
screenshot_path = f"{TEMP_PATH}/bilibili_opus_{opus_number}.png" screenshot_path = TEMP_PATH / "bilibili_opus_{opus_number}.png"
elif t_opus_number: elif t_opus_number:
screenshot_path = f"{TEMP_PATH}/bilibili_opus_{t_opus_number}.png" screenshot_path = TEMP_PATH / "bilibili_opus_{t_opus_number}.png"
# t.bilibili.com和https://www.bilibili.com/opus在内容上是一样的为便于维护调整url至https://www.bilibili.com/opus/ # t.bilibili.com和https://www.bilibili.com/opus在内容上是一样的为便于维护调整url至https://www.bilibili.com/opus/
url = f"https://www.bilibili.com/opus/{t_opus_number}" url = f"https://www.bilibili.com/opus/{t_opus_number}"
if screenshot_path: if screenshot_path:
try: try:
# 如果文件不存在,进行截图 # 如果文件不存在,进行截图
if not os.path.exists(screenshot_path): if not screenshot_path.exists():
# 创建页面 # 创建页面
# random.choice(),从列表中随机抽取一个对象
user_agent = get_user_agent_str()
try: try:
async with AsyncPlaywright.new_page() as page: async with AsyncPlaywright.new_page() as page:
await page.set_viewport_size({"width": 5120, "height": 2560}) await page.set_viewport_size({"width": 5120, "height": 2560})

View File

@ -50,11 +50,4 @@ class InformationContainer:
setattr(self, f"_{info_type}", new_value) setattr(self, f"_{info_type}", new_value)
def get_information(self): def get_information(self):
return ( return self
self.vd_info,
self.live_info,
self.vd_url,
self.live_url,
self.image_info,
self.image_url,
)

View File

@ -0,0 +1,96 @@
import os
import platform
from pathlib import Path
import nonebot
from nonebot import on_command
from nonebot.adapters import Bot
from nonebot.params import ArgStr
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
__plugin_meta__ = PluginMetadata(
name="重启",
description="执行脚本重启真寻",
usage=f"""
重启
""".strip(),
extra=PluginExtraData(
author="HibiKier", version="0.1", plugin_type=PluginType.SUPERUSER
).dict(),
)
_matcher = on_command(
"重启",
permission=SUPERUSER,
rule=to_me(),
priority=1,
block=True,
)
driver = nonebot.get_driver()
RESTART_MARK = Path() / "is_restart"
RESTART_FILE = Path() / "restart.sh"
@_matcher.got(
"flag",
prompt=f"确定是否重启{NICKNAME}?确定请回复[是|好|确定](重启失败咱们将失去联系,请谨慎!)",
)
async def _(bot: Bot, session: EventSession, flag: str = ArgStr("flag")):
if flag.lower() in ["true", "", "", "确定", "确定是"]:
await MessageUtils.build_message(f"开始重启{NICKNAME}..请稍等...").send()
with open(RESTART_MARK, "w", encoding="utf8") as f:
f.write(f"{bot.self_id} {session.id1}")
logger.info("开始重启真寻...", "重启", session=session)
if str(platform.system()).lower() == "windows":
import sys
python = sys.executable
os.execl(python, python, *sys.argv)
else:
os.system("./restart.sh")
else:
await MessageUtils.build_message("已取消操作...").send()
@driver.on_bot_connect
async def _(bot: Bot):
if str(platform.system()).lower() != "windows":
if not RESTART_FILE.exists():
with open(RESTART_FILE, "w", encoding="utf8") as f:
f.write(
f"pid=$(netstat -tunlp | grep "
+ str(bot.config.port)
+ " | awk '{print $7}')\n"
"pid=${pid%/*}\n"
"kill -9 $pid\n"
"sleep 3\n"
"python3 bot.py"
)
os.system("chmod +x ./restart.sh")
logger.info(
"已自动生成 restart.sh(重启) 文件,请检查脚本是否与本地指令符合..."
)
if RESTART_MARK.exists():
with open(RESTART_MARK, "r", encoding="utf8") as f:
bot_id, session_id = f.read().split()
if bot := nonebot.get_bot(bot_id):
if target := PlatformUtils.get_target(bot, session_id):
await MessageUtils.build_message(f"{NICKNAME}已成功重启!").send(
target, bot=bot
)
RESTART_MARK.unlink()

View File

@ -27,6 +27,7 @@ from ....config import AVA_URL, GROUP_AVA_URL
from ....utils import authentication from ....utils import authentication
from ...logs.log_manager import LOG_STORAGE from ...logs.log_manager import LOG_STORAGE
from .model import ( from .model import (
ClearRequest,
DeleteFriend, DeleteFriend,
Friend, Friend,
FriendRequestResult, FriendRequestResult,
@ -143,8 +144,12 @@ async def _(bot_id: str) -> Result:
"/get_request_count", dependencies=[authentication()], description="获取请求数量" "/get_request_count", dependencies=[authentication()], description="获取请求数量"
) )
async def _() -> Result: async def _() -> Result:
f_count = await FgRequest.filter(request_type=RequestType.FRIEND).count() f_count = await FgRequest.filter(
g_count = await FgRequest.filter(request_type=RequestType.GROUP).count() request_type=RequestType.FRIEND, handle_type__isnull=True
).count()
g_count = await FgRequest.filter(
request_type=RequestType.GROUP, handle_type__isnull=True
).count()
data = { data = {
"friend_count": f_count, "friend_count": f_count,
"group_count": g_count, "group_count": g_count,
@ -196,13 +201,13 @@ async def _() -> Result:
return Result.ok(req_result, f"{NICKNAME}带来了最新的数据!") return Result.ok(req_result, f"{NICKNAME}带来了最新的数据!")
@router.delete( @router.post(
"/clear_request", dependencies=[authentication()], description="清空请求列表" "/clear_request", dependencies=[authentication()], description="清空请求列表"
) )
async def _(request_type: Literal["private", "group"]) -> Result: async def _(cr: ClearRequest) -> Result:
await FgRequest.filter(handle_type__not_isnull=True).update( await FgRequest.filter(
handle_type=RequestHandleType.IGNORE handle_type__isnull=True, request_type=cr.request_type
) ).update(handle_type=RequestHandleType.IGNORE)
return Result.ok(info="成功清除了数据!") return Result.ok(info="成功清除了数据!")

View File

@ -2,6 +2,8 @@ from typing import Literal
from pydantic import BaseModel from pydantic import BaseModel
from zhenxun.utils.enum import RequestType
class Group(BaseModel): class Group(BaseModel):
""" """
@ -123,6 +125,14 @@ class GroupRequestResult(FriendRequestResult):
"""群聊名称""" """群聊名称"""
class ClearRequest(BaseModel):
"""
清空请求
"""
request_type: RequestType
class HandleRequest(BaseModel): class HandleRequest(BaseModel):
""" """
操作请求接收数据 操作请求接收数据

View File

@ -5,117 +5,158 @@ from typing import List, Optional
from fastapi import APIRouter from fastapi import APIRouter
from zhenxun.utils._build_image import BuildImage
from ....base_model import Result from ....base_model import Result
from ....utils import authentication, get_system_disk from ....utils import authentication, get_system_disk
from .model import AddFile, DeleteFile, DirFile, RenameFile, SaveFile from .model import AddFile, DeleteFile, DirFile, RenameFile, SaveFile
router = APIRouter(prefix="/system") router = APIRouter(prefix="/system")
IMAGE_TYPE = ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"]
@router.get("/get_dir_list", dependencies=[authentication()], description="获取文件列表") @router.get(
"/get_dir_list", dependencies=[authentication()], description="获取文件列表"
)
async def _(path: Optional[str] = None) -> Result: async def _(path: Optional[str] = None) -> Result:
base_path = Path(path) if path else Path() base_path = Path(path) if path else Path()
data_list = [] data_list = []
for file in os.listdir(base_path): for file in os.listdir(base_path):
data_list.append(DirFile(is_file=not (base_path / file).is_dir(), name=file, parent=path)) file_path = base_path / file
return Result.ok(data_list) is_image = False
for t in IMAGE_TYPE:
if file.endswith(f".{t}"):
is_image = True
break
data_list.append(
DirFile(
is_file=not file_path.is_dir(),
is_image=is_image,
name=file,
parent=path,
)
)
return Result.ok(data_list)
@router.get("/get_resources_size", dependencies=[authentication()], description="获取文件列表") @router.get(
"/get_resources_size", dependencies=[authentication()], description="获取文件列表"
)
async def _(full_path: Optional[str] = None) -> Result: async def _(full_path: Optional[str] = None) -> Result:
return Result.ok(await get_system_disk(full_path)) return Result.ok(await get_system_disk(full_path))
@router.post("/delete_file", dependencies=[authentication()], description="删除文件") @router.post("/delete_file", dependencies=[authentication()], description="删除文件")
async def _(param: DeleteFile) -> Result: async def _(param: DeleteFile) -> Result:
path = Path(param.full_path) path = Path(param.full_path)
if not path or not path.exists(): if not path or not path.exists():
return Result.warning_("文件不存在...") return Result.warning_("文件不存在...")
try: try:
path.unlink() path.unlink()
return Result.ok('删除成功!') return Result.ok("删除成功!")
except Exception as e: except Exception as e:
return Result.warning_('删除失败: ' + str(e)) return Result.warning_("删除失败: " + str(e))
@router.post("/delete_folder", dependencies=[authentication()], description="删除文件夹")
@router.post(
"/delete_folder", dependencies=[authentication()], description="删除文件夹"
)
async def _(param: DeleteFile) -> Result: async def _(param: DeleteFile) -> Result:
path = Path(param.full_path) path = Path(param.full_path)
if not path or not path.exists() or path.is_file(): if not path or not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...") return Result.warning_("文件夹不存在...")
try: try:
shutil.rmtree(path.absolute()) shutil.rmtree(path.absolute())
return Result.ok('删除成功!') return Result.ok("删除成功!")
except Exception as e: except Exception as e:
return Result.warning_('删除失败: ' + str(e)) return Result.warning_("删除失败: " + str(e))
@router.post("/rename_file", dependencies=[authentication()], description="重命名文件") @router.post("/rename_file", dependencies=[authentication()], description="重命名文件")
async def _(param: RenameFile) -> Result: async def _(param: RenameFile) -> Result:
path = (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name) path = (
if not path or not path.exists(): (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
return Result.warning_("文件不存在...") )
try: if not path or not path.exists():
path.rename(path.parent / param.name) return Result.warning_("文件不存在...")
return Result.ok('重命名成功!') try:
except Exception as e: path.rename(path.parent / param.name)
return Result.warning_('重命名失败: ' + str(e)) return Result.ok("重命名成功!")
except Exception as e:
return Result.warning_("重命名失败: " + str(e))
@router.post("/rename_folder", dependencies=[authentication()], description="重命名文件夹")
@router.post(
"/rename_folder", dependencies=[authentication()], description="重命名文件夹"
)
async def _(param: RenameFile) -> Result: async def _(param: RenameFile) -> Result:
path = (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name) path = (
if not path or not path.exists() or path.is_file(): (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
return Result.warning_("文件夹不存在...") )
try: if not path or not path.exists() or path.is_file():
new_path = path.parent / param.name return Result.warning_("文件夹不存在...")
shutil.move(path.absolute(), new_path.absolute()) try:
return Result.ok('重命名成功!') new_path = path.parent / param.name
except Exception as e: shutil.move(path.absolute(), new_path.absolute())
return Result.warning_('重命名失败: ' + str(e)) return Result.ok("重命名成功!")
except Exception as e:
return Result.warning_("重命名失败: " + str(e))
@router.post("/add_file", dependencies=[authentication()], description="新建文件") @router.post("/add_file", dependencies=[authentication()], description="新建文件")
async def _(param: AddFile) -> Result: async def _(param: AddFile) -> Result:
path = (Path(param.parent) / param.name) if param.parent else Path(param.name) path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
if path.exists(): if path.exists():
return Result.warning_("文件已存在...") return Result.warning_("文件已存在...")
try: try:
path.open('w') path.open("w")
return Result.ok('新建文件成功!') return Result.ok("新建文件成功!")
except Exception as e: except Exception as e:
return Result.warning_('新建文件失败: ' + str(e)) return Result.warning_("新建文件失败: " + str(e))
@router.post("/add_folder", dependencies=[authentication()], description="新建文件夹") @router.post("/add_folder", dependencies=[authentication()], description="新建文件夹")
async def _(param: AddFile) -> Result: async def _(param: AddFile) -> Result:
path = (Path(param.parent) / param.name) if param.parent else Path(param.name) path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
if path.exists(): if path.exists():
return Result.warning_("文件夹已存在...") return Result.warning_("文件夹已存在...")
try: try:
path.mkdir() path.mkdir()
return Result.ok('新建文件夹成功!') return Result.ok("新建文件夹成功!")
except Exception as e: except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e)) return Result.warning_("新建文件夹失败: " + str(e))
@router.get("/read_file", dependencies=[authentication()], description="读取文件") @router.get("/read_file", dependencies=[authentication()], description="读取文件")
async def _(full_path: str) -> Result: async def _(full_path: str) -> Result:
path = Path(full_path) path = Path(full_path)
if not path.exists(): if not path.exists():
return Result.warning_("文件不存在...") return Result.warning_("文件不存在...")
try: try:
text = path.read_text(encoding='utf-8') text = path.read_text(encoding="utf-8")
return Result.ok(text) return Result.ok(text)
except Exception as e: except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e)) return Result.warning_("读取文件失败: " + str(e))
@router.post("/save_file", dependencies=[authentication()], description="读取文件") @router.post("/save_file", dependencies=[authentication()], description="读取文件")
async def _(param: SaveFile) -> Result: async def _(param: SaveFile) -> Result:
path = Path(param.full_path) path = Path(param.full_path)
try: try:
with path.open('w') as f: with path.open("w") as f:
f.write(param.content) f.write(param.content)
return Result.ok("更新成功!") return Result.ok("更新成功!")
except Exception as e: except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e)) return Result.warning_("保存文件失败: " + str(e))
@router.get("/get_image", dependencies=[authentication()], description="读取图片base64")
async def _(full_path: str) -> Result:
path = Path(full_path)
if not path.exists():
return Result.warning_("文件不存在...")
try:
return Result.ok(BuildImage.open(path).pic2bs4())
except Exception as e:
return Result.warning_("获取图片失败: " + str(e))

View File

@ -1,6 +1,3 @@
from datetime import datetime from datetime import datetime
from typing import Literal, Optional from typing import Literal, Optional
@ -8,57 +5,59 @@ from pydantic import BaseModel
class DirFile(BaseModel): class DirFile(BaseModel):
"""
文件或文件夹
"""
""" is_file: bool
文件或文件夹 """是否为文件"""
""" is_image: bool
"""是否为图片"""
name: str
"""文件夹或文件名称"""
parent: Optional[str] = None
"""父级"""
is_file: bool
"""是否为文件"""
name: str
"""文件夹或文件名称"""
parent: Optional[str] = None
"""父级"""
class DeleteFile(BaseModel): class DeleteFile(BaseModel):
"""
删除文件
"""
""" full_path: str
删除文件 """文件全路径"""
"""
full_path: str
"""文件全路径"""
class RenameFile(BaseModel): class RenameFile(BaseModel):
"""
删除文件
"""
""" parent: Optional[str]
删除文件 """父路径"""
""" old_name: str
parent: Optional[str] """旧名称"""
"""父路径""" name: str
old_name: str """新名称"""
"""旧名称"""
name: str
"""新名称"""
class AddFile(BaseModel): class AddFile(BaseModel):
"""
新建文件
"""
""" parent: Optional[str]
新建文件 """父路径"""
""" name: str
parent: Optional[str] """新名称"""
"""父路径"""
name: str
"""新名称"""
class SaveFile(BaseModel): class SaveFile(BaseModel):
"""
""" 保存文件
保存文件 """
"""
full_path: str full_path: str
"""全路径""" """全路径"""
content: str content: str
"""内容""" """内容"""

View File

@ -123,13 +123,12 @@ class WordBank(Model):
# 对图片做额外处理 # 对图片做额外处理
image_path = None image_path = None
if word_type == 3: if word_type == 3:
_file = ( _uuid = uuid.uuid1()
path / "problem" / f"{group_id}" / f"{user_id}_{int(time.time())}.jpg" _file = path / "problem" / f"{group_id}" / f"{user_id}_{_uuid}.jpg"
)
_file.parent.mkdir(exist_ok=True, parents=True) _file.parent.mkdir(exist_ok=True, parents=True)
await AsyncHttpx.download_file(problem, _file) await AsyncHttpx.download_file(problem, _file)
problem = get_img_hash(_file) problem = get_img_hash(_file)
image_path = f"problem/{group_id}/{user_id}_{int(time.time())}.jpg" image_path = f"problem/{group_id}/{user_id}_{_uuid}.jpg"
new_answer, placeholder_list = await cls._answer2format( new_answer, placeholder_list = await cls._answer2format(
answer, user_id, group_id answer, user_id, group_id
) )