Merge pull request #1549 from HibiKier/dev

Dev
This commit is contained in:
HibiKier 2024-08-12 22:36:01 +08:00 committed by GitHub
commit 1be99ac64d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 437 additions and 210 deletions

View File

@ -1,8 +1,10 @@
import nonebot
from nonebot import on_notice
from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import GroupIncreaseNoticeEvent
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Arparma, on_alconna
from nonebot_plugin_apscheduler import scheduler
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME
@ -10,6 +12,7 @@ from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.rules import admin_check, ensure_group
from ._data_source import MemberUpdateManage
@ -39,6 +42,9 @@ _matcher = on_alconna(
)
_notice = on_notice(priority=1, block=False)
@_matcher.handle()
async def _(bot: Bot, session: EventSession, arparma: Arparma):
if gid := session.id3 or session.id2:
@ -50,9 +56,6 @@ async def _(bot: Bot, session: EventSession, arparma: Arparma):
await MessageUtils.build_message("群组id为空...").send()
_notice = on_notice(priority=1, block=False)
@_notice.handle()
async def _(bot: Bot, event: GroupIncreaseNoticeEvent):
# TODO: 其他适配器的加群自动更新群组成员信息
@ -64,3 +67,28 @@ async def _(bot: Bot, event: GroupIncreaseNoticeEvent):
session=event.user_id,
group_id=event.group_id,
)
@scheduler.scheduled_job(
"interval",
minutes=5,
)
async def _():
for bot in nonebot.get_bots().values():
if PlatformUtils.get_platform(bot) == "qq":
try:
group_list, _ = await PlatformUtils.get_group_list(bot)
if group_list:
for group in group_list:
try:
await MemberUpdateManage.update(bot, group.group_id)
logger.debug("自动更新群组成员信息成功...")
except Exception as e:
logger.error(
f"Bot: {bot.self_id} 自动更新群组成员信息成功",
target=group.group_id,
e=e,
)
except Exception as e:
logger.error(f"Bot: {bot.self_id} 自动更新群组信息", e=e)
logger.debug(f"自动 Bot: {bot.self_id} 更新群组成员信息成功...")

View File

@ -236,6 +236,9 @@ class AuthChecker:
LimitManage.unblock(
matcher.plugin.name, user_id, group_id, channel_id
)
except AssertionError as e:
is_ignore = True
logger.debug("消息无法发送", session=session, e=e)
if cost_gold and user_id:
"""花费金币"""
try:

View File

@ -23,13 +23,15 @@ async def _(bot: Bot):
if PlatformUtils.get_platform(bot) == "qq":
logger.debug(f"更新Bot: {bot.self_id} 的群认证...")
group_list, _ = await PlatformUtils.get_group_list(bot)
gid_list = [g.group_id for g in group_list]
gid_list = [(g.group_id, g.group_name) for g in group_list]
db_group_list = await GroupConsole.all().values_list("group_id", flat=True)
create_list = []
update_id = []
for gid in gid_list:
for gid, name in gid_list:
if gid not in db_group_list:
create_list.append(GroupConsole(group_id=gid, group_flag=1))
create_list.append(
GroupConsole(group_id=gid, group_name=name, group_flag=1)
)
else:
update_id.append(gid)
if create_list:

View File

@ -231,7 +231,7 @@ async def _(bot: Bot, event: GroupIncreaseNoticeEvent | GroupMemberIncreaseEvent
"""群欢迎消息"""
_flmt.start_cd(group_id)
path = DATA_PATH / "welcome_message" / "qq" / f"{group_id}"
data = json.load((path / "text.json").open(encoding="utf8"))
data = json.load((path / "text.json").open(encoding="utf-8"))
message = data["message"]
msg_split = re.split(r"\[image:\d+\]", message)
msg_list = []

View File

@ -17,7 +17,7 @@ async def _():
try:
await PlatformUtils.update_group(bot)
except Exception as e:
logger.error(f"Bot: {bot.self_id} 自动更新群组信息", e=e)
logger.error(f"Bot: {bot.self_id} 自动更新群组信息", "自动更新群组", e=e)
logger.info("自动更新群组成员信息成功...")
@ -33,5 +33,7 @@ async def _():
try:
await PlatformUtils.update_friend(bot)
except Exception as e:
logger.error(f"自动更新好友信息错误", "自动更新好友", e=e)
logger.error(
f"Bot: {bot.self_id} 自动更新好友信息错误", "自动更新好友", e=e
)
logger.info("自动更新好友信息成功...")

View File

@ -3,9 +3,7 @@ from io import BytesIO
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args
from nonebot_plugin_alconna import At as alcAt
from nonebot_plugin_alconna import Match, on_alconna
from nonebot_plugin_alconna import Alconna, Args, At, Match, on_alconna
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData
@ -29,7 +27,7 @@ __plugin_meta__ = PluginMetadata(
)
_matcher = on_alconna(
Alconna("one-friend", Args["text", str]["at?", alcAt]), priority=5, block=True
Alconna("one-friend", Args["text", str]["at?", At]), priority=5, block=True
)
_matcher.shortcut(
@ -41,7 +39,7 @@ _matcher.shortcut(
@_matcher.handle()
async def _(bot: Bot, text: str, at: Match[alcAt], session: EventSession):
async def _(bot: Bot, text: str, at: Match[At], session: EventSession):
gid = session.id3 or session.id2
if not gid:
await MessageUtils.build_message("群组id为空...").finish()

View File

@ -115,27 +115,28 @@ async def _(session: EventSession, message: UniMsg):
if get_url:
# 将链接统一发送给处理函数
vd_info, live_info, vd_url, live_url, image_info, image_url = (
await parse_bili_url(get_url, information_container)
)
if vd_info:
data = await parse_bili_url(get_url, information_container)
if data.vd_info:
# 判断一定时间内是否解析重复内容,或者是第一次解析
if (
vd_url in _tmp.keys() and time.time() - _tmp[vd_url] > repet_second
) or vd_url not in _tmp.keys():
pic = vd_info.get("pic", "") # 封面
aid = vd_info.get("aid", "") # av号
title = vd_info.get("title", "") # 标题
author = vd_info.get("owner", {}).get("name", "") # UP主
reply = vd_info.get("stat", {}).get("reply", "") # 回复
favorite = vd_info.get("stat", {}).get("favorite", "") # 收藏
coin = vd_info.get("stat", {}).get("coin", "") # 投币
like = vd_info.get("stat", {}).get("like", "") # 点赞
danmuku = vd_info.get("stat", {}).get("danmaku", "") # 弹幕
ctime = vd_info["ctime"]
data.vd_url in _tmp.keys()
and time.time() - _tmp[data.vd_url] > repet_second
) or data.vd_url not in _tmp.keys():
pic = data.vd_info.get("pic", "") # 封面
aid = data.vd_info.get("aid", "") # av号
title = data.vd_info.get("title", "") # 标题
author = data.vd_info.get("owner", {}).get("name", "") # UP主
reply = data.vd_info.get("stat", {}).get("reply", "") # 回复
favorite = data.vd_info.get("stat", {}).get("favorite", "") # 收藏
coin = data.vd_info.get("stat", {}).get("coin", "") # 投币
like = data.vd_info.get("stat", {}).get("like", "") # 点赞
danmuku = data.vd_info.get("stat", {}).get("danmaku", "") # 弹幕
ctime = data.vd_info["ctime"]
date = time.strftime("%Y-%m-%d", time.localtime(ctime))
logger.info(f"解析bilibili转发 {vd_url}", "b站解析", session=session)
_tmp[vd_url] = time.time()
logger.info(
f"解析bilibili转发 {data.vd_url}", "b站解析", session=session
)
_tmp[data.vd_url] = time.time()
_path = TEMP_PATH / f"{aid}.jpg"
await AsyncHttpx.download_file(pic, _path)
await MessageUtils.build_message(
@ -145,33 +146,40 @@ async def _(session: EventSession, message: UniMsg):
]
).send()
elif live_info:
elif data.live_info:
if (
live_url in _tmp.keys() and time.time() - _tmp[live_url] > repet_second
) or live_url not in _tmp.keys():
uid = live_info.get("uid", "") # 主播uid
title = live_info.get("title", "") # 直播间标题
description = live_info.get("description", "") # 简介,可能会出现标签
user_cover = live_info.get("user_cover", "") # 封面
keyframe = live_info.get("keyframe", "") # 关键帧画面
live_time = live_info.get("live_time", "") # 开播时间
area_name = live_info.get("area_name", "") # 分区
parent_area_name = live_info.get("parent_area_name", "") # 父分区
logger.info(f"解析bilibili转发 {live_url}", "b站解析", session=session)
_tmp[live_url] = time.time()
data.live_url in _tmp.keys()
and time.time() - _tmp[data.live_url] > repet_second
) or data.live_url not in _tmp.keys():
uid = data.live_info.get("uid", "") # 主播uid
title = data.live_info.get("title", "") # 直播间标题
description = data.live_info.get(
"description", ""
) # 简介,可能会出现标签
user_cover = data.live_info.get("user_cover", "") # 封面
keyframe = data.live_info.get("keyframe", "") # 关键帧画面
live_time = data.live_info.get("live_time", "") # 开播时间
area_name = data.live_info.get("area_name", "") # 分区
parent_area_name = data.live_info.get("parent_area_name", "") # 父分区
logger.info(
f"解析bilibili转发 {data.live_url}", "b站解析", session=session
)
_tmp[data.live_url] = time.time()
await MessageUtils.build_message(
[
Image(url=user_cover),
f"开播用户https://space.bilibili.com/{uid}\n开播时间:{live_time}\n直播分区:{parent_area_name}——>{area_name}\n标题:{title}\n简介:{description}\n直播截图:\n",
Image(url=keyframe),
f"{live_url}",
f"{data.live_url}",
]
).send()
elif image_info:
elif data.image_info:
if (
image_url in _tmp.keys()
and time.time() - _tmp[image_url] > repet_second
) or image_url not in _tmp.keys():
logger.info(f"解析bilibili转发 {image_url}", "b站解析", session=session)
_tmp[image_url] = time.time()
await image_info.send()
data.image_url in _tmp.keys()
and time.time() - _tmp[data.image_url] > repet_second
) or data.image_url not in _tmp.keys():
logger.info(
f"解析bilibili转发 {data.image_url}", "b站解析", session=session
)
_tmp[data.image_url] = time.time()
await data.image_info.send()

View File

@ -1,5 +1,6 @@
import os
import re
from pathlib import Path
from nonebot_plugin_alconna import UniMessage
@ -11,14 +12,14 @@ from zhenxun.utils.message import MessageUtils
from zhenxun.utils.user_agent import get_user_agent_str
async def resize(path: str):
async def resize(path: Path):
"""调整图像大小的异步函数
参数:
path (str): 图像文件路径
"""
A = BuildImage(background=path)
await A.resize(0.5)
A = BuildImage.open(path)
await A.resize(0.8)
await A.save(path)
@ -56,21 +57,18 @@ async def get_image(url) -> UniMessage | None:
# 根据编号构建保存路径
if cv_number:
screenshot_path = f"{TEMP_PATH}/bilibili_cv_{cv_number}.png"
screenshot_path = TEMP_PATH / "bilibili_cv_{cv_number}.png"
elif opus_number:
screenshot_path = f"{TEMP_PATH}/bilibili_opus_{opus_number}.png"
screenshot_path = TEMP_PATH / "bilibili_opus_{opus_number}.png"
elif t_opus_number:
screenshot_path = f"{TEMP_PATH}/bilibili_opus_{t_opus_number}.png"
screenshot_path = TEMP_PATH / "bilibili_opus_{t_opus_number}.png"
# t.bilibili.com和https://www.bilibili.com/opus在内容上是一样的为便于维护调整url至https://www.bilibili.com/opus/
url = f"https://www.bilibili.com/opus/{t_opus_number}"
if screenshot_path:
try:
# 如果文件不存在,进行截图
if not os.path.exists(screenshot_path):
if not screenshot_path.exists():
# 创建页面
# random.choice(),从列表中随机抽取一个对象
user_agent = get_user_agent_str()
try:
async with AsyncPlaywright.new_page() as page:
await page.set_viewport_size({"width": 5120, "height": 2560})

View File

@ -50,11 +50,4 @@ class InformationContainer:
setattr(self, f"_{info_type}", new_value)
def get_information(self):
return (
self.vd_info,
self.live_info,
self.vd_url,
self.live_url,
self.image_info,
self.image_url,
)
return self

View File

@ -0,0 +1,96 @@
import os
import platform
from pathlib import Path
import nonebot
from nonebot import on_command
from nonebot.adapters import Bot
from nonebot.params import ArgStr
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
__plugin_meta__ = PluginMetadata(
name="重启",
description="执行脚本重启真寻",
usage=f"""
重启
""".strip(),
extra=PluginExtraData(
author="HibiKier", version="0.1", plugin_type=PluginType.SUPERUSER
).dict(),
)
_matcher = on_command(
"重启",
permission=SUPERUSER,
rule=to_me(),
priority=1,
block=True,
)
driver = nonebot.get_driver()
RESTART_MARK = Path() / "is_restart"
RESTART_FILE = Path() / "restart.sh"
@_matcher.got(
"flag",
prompt=f"确定是否重启{NICKNAME}?确定请回复[是|好|确定](重启失败咱们将失去联系,请谨慎!)",
)
async def _(bot: Bot, session: EventSession, flag: str = ArgStr("flag")):
if flag.lower() in ["true", "", "", "确定", "确定是"]:
await MessageUtils.build_message(f"开始重启{NICKNAME}..请稍等...").send()
with open(RESTART_MARK, "w", encoding="utf8") as f:
f.write(f"{bot.self_id} {session.id1}")
logger.info("开始重启真寻...", "重启", session=session)
if str(platform.system()).lower() == "windows":
import sys
python = sys.executable
os.execl(python, python, *sys.argv)
else:
os.system("./restart.sh")
else:
await MessageUtils.build_message("已取消操作...").send()
@driver.on_bot_connect
async def _(bot: Bot):
if str(platform.system()).lower() != "windows":
if not RESTART_FILE.exists():
with open(RESTART_FILE, "w", encoding="utf8") as f:
f.write(
f"pid=$(netstat -tunlp | grep "
+ str(bot.config.port)
+ " | awk '{print $7}')\n"
"pid=${pid%/*}\n"
"kill -9 $pid\n"
"sleep 3\n"
"python3 bot.py"
)
os.system("chmod +x ./restart.sh")
logger.info(
"已自动生成 restart.sh(重启) 文件,请检查脚本是否与本地指令符合..."
)
if RESTART_MARK.exists():
with open(RESTART_MARK, "r", encoding="utf8") as f:
bot_id, session_id = f.read().split()
if bot := nonebot.get_bot(bot_id):
if target := PlatformUtils.get_target(bot, session_id):
await MessageUtils.build_message(f"{NICKNAME}已成功重启!").send(
target, bot=bot
)
RESTART_MARK.unlink()

View File

@ -27,6 +27,7 @@ from ....config import AVA_URL, GROUP_AVA_URL
from ....utils import authentication
from ...logs.log_manager import LOG_STORAGE
from .model import (
ClearRequest,
DeleteFriend,
Friend,
FriendRequestResult,
@ -143,8 +144,12 @@ async def _(bot_id: str) -> Result:
"/get_request_count", dependencies=[authentication()], description="获取请求数量"
)
async def _() -> Result:
f_count = await FgRequest.filter(request_type=RequestType.FRIEND).count()
g_count = await FgRequest.filter(request_type=RequestType.GROUP).count()
f_count = await FgRequest.filter(
request_type=RequestType.FRIEND, handle_type__isnull=True
).count()
g_count = await FgRequest.filter(
request_type=RequestType.GROUP, handle_type__isnull=True
).count()
data = {
"friend_count": f_count,
"group_count": g_count,
@ -196,13 +201,13 @@ async def _() -> Result:
return Result.ok(req_result, f"{NICKNAME}带来了最新的数据!")
@router.delete(
@router.post(
"/clear_request", dependencies=[authentication()], description="清空请求列表"
)
async def _(request_type: Literal["private", "group"]) -> Result:
await FgRequest.filter(handle_type__not_isnull=True).update(
handle_type=RequestHandleType.IGNORE
)
async def _(cr: ClearRequest) -> Result:
await FgRequest.filter(
handle_type__isnull=True, request_type=cr.request_type
).update(handle_type=RequestHandleType.IGNORE)
return Result.ok(info="成功清除了数据!")

View File

@ -2,6 +2,8 @@ from typing import Literal
from pydantic import BaseModel
from zhenxun.utils.enum import RequestType
class Group(BaseModel):
"""
@ -123,6 +125,14 @@ class GroupRequestResult(FriendRequestResult):
"""群聊名称"""
class ClearRequest(BaseModel):
"""
清空请求
"""
request_type: RequestType
class HandleRequest(BaseModel):
"""
操作请求接收数据

View File

@ -5,117 +5,158 @@ from typing import List, Optional
from fastapi import APIRouter
from zhenxun.utils._build_image import BuildImage
from ....base_model import Result
from ....utils import authentication, get_system_disk
from .model import AddFile, DeleteFile, DirFile, RenameFile, SaveFile
router = APIRouter(prefix="/system")
IMAGE_TYPE = ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"]
@router.get("/get_dir_list", dependencies=[authentication()], description="获取文件列表")
@router.get(
"/get_dir_list", dependencies=[authentication()], description="获取文件列表"
)
async def _(path: Optional[str] = None) -> Result:
base_path = Path(path) if path else Path()
data_list = []
for file in os.listdir(base_path):
data_list.append(DirFile(is_file=not (base_path / file).is_dir(), name=file, parent=path))
return Result.ok(data_list)
base_path = Path(path) if path else Path()
data_list = []
for file in os.listdir(base_path):
file_path = base_path / file
is_image = False
for t in IMAGE_TYPE:
if file.endswith(f".{t}"):
is_image = True
break
data_list.append(
DirFile(
is_file=not file_path.is_dir(),
is_image=is_image,
name=file,
parent=path,
)
)
return Result.ok(data_list)
@router.get("/get_resources_size", dependencies=[authentication()], description="获取文件列表")
@router.get(
"/get_resources_size", dependencies=[authentication()], description="获取文件列表"
)
async def _(full_path: Optional[str] = None) -> Result:
return Result.ok(await get_system_disk(full_path))
return Result.ok(await get_system_disk(full_path))
@router.post("/delete_file", dependencies=[authentication()], description="删除文件")
async def _(param: DeleteFile) -> Result:
path = Path(param.full_path)
if not path or not path.exists():
return Result.warning_("文件不存在...")
try:
path.unlink()
return Result.ok('删除成功!')
except Exception as e:
return Result.warning_('删除失败: ' + str(e))
@router.post("/delete_folder", dependencies=[authentication()], description="删除文件夹")
path = Path(param.full_path)
if not path or not path.exists():
return Result.warning_("文件不存在...")
try:
path.unlink()
return Result.ok("删除成功!")
except Exception as e:
return Result.warning_("删除失败: " + str(e))
@router.post(
"/delete_folder", dependencies=[authentication()], description="删除文件夹"
)
async def _(param: DeleteFile) -> Result:
path = Path(param.full_path)
if not path or not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...")
try:
shutil.rmtree(path.absolute())
return Result.ok('删除成功!')
except Exception as e:
return Result.warning_('删除失败: ' + str(e))
path = Path(param.full_path)
if not path or not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...")
try:
shutil.rmtree(path.absolute())
return Result.ok("删除成功!")
except Exception as e:
return Result.warning_("删除失败: " + str(e))
@router.post("/rename_file", dependencies=[authentication()], description="重命名文件")
async def _(param: RenameFile) -> Result:
path = (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
if not path or not path.exists():
return Result.warning_("文件不存在...")
try:
path.rename(path.parent / param.name)
return Result.ok('重命名成功!')
except Exception as e:
return Result.warning_('重命名失败: ' + str(e))
path = (
(Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
)
if not path or not path.exists():
return Result.warning_("文件不存在...")
try:
path.rename(path.parent / param.name)
return Result.ok("重命名成功!")
except Exception as e:
return Result.warning_("重命名失败: " + str(e))
@router.post("/rename_folder", dependencies=[authentication()], description="重命名文件夹")
@router.post(
"/rename_folder", dependencies=[authentication()], description="重命名文件夹"
)
async def _(param: RenameFile) -> Result:
path = (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
if not path or not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...")
try:
new_path = path.parent / param.name
shutil.move(path.absolute(), new_path.absolute())
return Result.ok('重命名成功!')
except Exception as e:
return Result.warning_('重命名失败: ' + str(e))
path = (
(Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
)
if not path or not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...")
try:
new_path = path.parent / param.name
shutil.move(path.absolute(), new_path.absolute())
return Result.ok("重命名成功!")
except Exception as e:
return Result.warning_("重命名失败: " + str(e))
@router.post("/add_file", dependencies=[authentication()], description="新建文件")
async def _(param: AddFile) -> Result:
path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
if path.exists():
return Result.warning_("文件已存在...")
try:
path.open('w')
return Result.ok('新建文件成功!')
except Exception as e:
return Result.warning_('新建文件失败: ' + str(e))
path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
if path.exists():
return Result.warning_("文件已存在...")
try:
path.open("w")
return Result.ok("新建文件成功!")
except Exception as e:
return Result.warning_("新建文件失败: " + str(e))
@router.post("/add_folder", dependencies=[authentication()], description="新建文件夹")
async def _(param: AddFile) -> Result:
path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
if path.exists():
return Result.warning_("文件夹已存在...")
try:
path.mkdir()
return Result.ok('新建文件夹成功!')
except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e))
path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
if path.exists():
return Result.warning_("文件夹已存在...")
try:
path.mkdir()
return Result.ok("新建文件夹成功!")
except Exception as e:
return Result.warning_("新建文件夹失败: " + str(e))
@router.get("/read_file", dependencies=[authentication()], description="读取文件")
async def _(full_path: str) -> Result:
path = Path(full_path)
if not path.exists():
return Result.warning_("文件不存在...")
try:
text = path.read_text(encoding='utf-8')
return Result.ok(text)
except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e))
path = Path(full_path)
if not path.exists():
return Result.warning_("文件不存在...")
try:
text = path.read_text(encoding="utf-8")
return Result.ok(text)
except Exception as e:
return Result.warning_("读取文件失败: " + str(e))
@router.post("/save_file", dependencies=[authentication()], description="读取文件")
async def _(param: SaveFile) -> Result:
path = Path(param.full_path)
try:
with path.open('w') as f:
f.write(param.content)
return Result.ok("更新成功!")
except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e))
path = Path(param.full_path)
try:
with path.open("w") as f:
f.write(param.content)
return Result.ok("更新成功!")
except Exception as e:
return Result.warning_("保存文件失败: " + str(e))
@router.get("/get_image", dependencies=[authentication()], description="读取图片base64")
async def _(full_path: str) -> Result:
path = Path(full_path)
if not path.exists():
return Result.warning_("文件不存在...")
try:
return Result.ok(BuildImage.open(path).pic2bs4())
except Exception as e:
return Result.warning_("获取图片失败: " + str(e))

View File

@ -1,6 +1,3 @@
from datetime import datetime
from typing import Literal, Optional
@ -8,57 +5,59 @@ from pydantic import BaseModel
class DirFile(BaseModel):
"""
文件或文件夹
"""
"""
文件或文件夹
"""
is_file: bool
"""是否为文件"""
is_image: bool
"""是否为图片"""
name: str
"""文件夹或文件名称"""
parent: Optional[str] = None
"""父级"""
is_file: bool
"""是否为文件"""
name: str
"""文件夹或文件名称"""
parent: Optional[str] = None
"""父级"""
class DeleteFile(BaseModel):
"""
删除文件
"""
"""
删除文件
"""
full_path: str
"""文件全路径"""
full_path: str
"""文件全路径"""
class RenameFile(BaseModel):
"""
删除文件
"""
"""
删除文件
"""
parent: Optional[str]
"""父路径"""
old_name: str
"""旧名称"""
name: str
"""新名称"""
parent: Optional[str]
"""父路径"""
old_name: str
"""旧名称"""
name: str
"""新名称"""
class AddFile(BaseModel):
"""
新建文件
"""
"""
新建文件
"""
parent: Optional[str]
"""父路径"""
name: str
"""新名称"""
parent: Optional[str]
"""父路径"""
name: str
"""新名称"""
class SaveFile(BaseModel):
"""
保存文件
"""
full_path: str
"""全路径"""
content: str
"""内容"""
"""
保存文件
"""
full_path: str
"""全路径"""
content: str
"""内容"""

View File

@ -123,13 +123,12 @@ class WordBank(Model):
# 对图片做额外处理
image_path = None
if word_type == 3:
_file = (
path / "problem" / f"{group_id}" / f"{user_id}_{int(time.time())}.jpg"
)
_uuid = uuid.uuid1()
_file = path / "problem" / f"{group_id}" / f"{user_id}_{_uuid}.jpg"
_file.parent.mkdir(exist_ok=True, parents=True)
await AsyncHttpx.download_file(problem, _file)
problem = get_img_hash(_file)
image_path = f"problem/{group_id}/{user_id}_{int(time.time())}.jpg"
image_path = f"problem/{group_id}/{user_id}_{_uuid}.jpg"
new_answer, placeholder_list = await cls._answer2format(
answer, user_id, group_id
)

View File

@ -1,5 +1,10 @@
import os
import sys
from nonebot import get_driver
from playwright.__main__ import main
from playwright.async_api import Browser, Playwright, async_playwright
from zhenxun.configs.config import SYSTEM_PROXY
from zhenxun.services.log import logger
@ -13,6 +18,8 @@ _browser: Browser | None = None
async def start_browser():
global _playwright
global _browser
install()
await check_playwright_env()
_playwright = await async_playwright().start()
_browser = await _playwright.chromium.launch()
@ -33,13 +40,51 @@ def get_browser() -> Browser:
def install():
"""自动安装、更新 Chromium"""
logger.info("正在检查 Chromium 更新")
import sys
from playwright.__main__ import main
def set_env_variables():
os.environ[
"PLAYWRIGHT_DOWNLOAD_HOST"] = "https://npmmirror.com/mirrors/playwright/"
if SYSTEM_PROXY:
os.environ["HTTPS_PROXY"] = SYSTEM_PROXY
sys.argv = ["", "install", "chromium"]
def restore_env_variables():
os.environ.pop("PLAYWRIGHT_DOWNLOAD_HOST", None)
if SYSTEM_PROXY:
os.environ.pop("HTTPS_PROXY", None)
if original_proxy is not None:
os.environ["HTTPS_PROXY"] = original_proxy
def try_install_chromium():
try:
sys.argv = ["", "install", "chromium"]
main()
except SystemExit as e:
return e.code == 0
return False
logger.info("检查 Chromium 更新")
original_proxy = os.environ.get("HTTPS_PROXY")
set_env_variables()
success = try_install_chromium()
if not success:
logger.info("Chromium 更新失败,尝试从原始仓库下载,速度较慢")
os.environ["PLAYWRIGHT_DOWNLOAD_HOST"] = ""
success = try_install_chromium()
restore_env_variables()
if not success:
raise RuntimeError("未知错误Chromium 下载失败")
async def check_playwright_env():
"""检查 Playwright 依赖"""
logger.info("检查 Playwright 依赖")
try:
main()
except SystemExit:
pass
async with async_playwright() as p:
await p.chromium.launch()
except Exception as e:
raise ImportError("加载失败Playwright 依赖不全,") from e