mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-14 21:52:56 +08:00
update v0.1.5.7
This commit is contained in:
parent
47c87f8694
commit
731bde8e28
@ -243,6 +243,11 @@ __Docker 最新版本由 [Sakuracio](https://github.com/Sakuracio) 提供__
|
||||
|
||||
## 更新
|
||||
|
||||
### 2022/5/29 \[v0.1.5.7]
|
||||
|
||||
* 修复错误的resource路径会删除IMAGE_PATH等重要文件夹
|
||||
* 提供了真寻适配仓库的插件 安装/卸载 操作
|
||||
|
||||
### 2022/5/28
|
||||
|
||||
* 修复私聊无法添加昵称
|
||||
|
||||
57
basic_plugins/plugin_shop/__init__.py
Normal file
57
basic_plugins/plugin_shop/__init__.py
Normal file
@ -0,0 +1,57 @@
|
||||
from nonebot import on_command, on_regex
|
||||
|
||||
from .data_source import show_plugin_repo, install_plugin, uninstall_plugin
|
||||
from services.log import logger
|
||||
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message, MessageEvent
|
||||
from nonebot.params import CommandArg
|
||||
|
||||
from utils.message_builder import image
|
||||
from nonebot.permission import SUPERUSER
|
||||
|
||||
|
||||
__zx_plugin_name__ = "插件商店 [Superuser]"
|
||||
__plugin_usage__ = """
|
||||
usage:
|
||||
下载安装插件
|
||||
指令:
|
||||
查看插件仓库
|
||||
安装插件 [name/id]
|
||||
卸载插件 [name/id]
|
||||
""".strip()
|
||||
__plugin_des__ = "从真寻适配仓库中下载插件"
|
||||
__plugin_cmd__ = [""]
|
||||
__plugin_version__ = 0.1
|
||||
__plugin_author__ = "HibiKier"
|
||||
|
||||
show_repo = on_regex("^查看插件仓库$", priority=1, block=True, permission=SUPERUSER)
|
||||
|
||||
install_plugin_matcher = on_command("安装插件", priority=1, block=True, permission=SUPERUSER)
|
||||
|
||||
uninstall_plugin_matcher = on_command("卸载插件", priority=1, block=True, permission=SUPERUSER)
|
||||
|
||||
|
||||
@install_plugin_matcher.handle()
|
||||
async def _(bot: Bot, event: MessageEvent, arg: Message = CommandArg()):
|
||||
arg = arg.extract_plain_text().strip()
|
||||
msg = await install_plugin(arg)
|
||||
await install_plugin_matcher.send(msg)
|
||||
|
||||
|
||||
@uninstall_plugin_matcher.handle()
|
||||
async def _(bot: Bot, event: MessageEvent, arg: Message = CommandArg()):
|
||||
arg = arg.extract_plain_text().strip()
|
||||
msg = await uninstall_plugin(arg)
|
||||
await install_plugin_matcher.send(msg)
|
||||
|
||||
|
||||
@show_repo.handle()
|
||||
async def _(bot: Bot, event: MessageEvent):
|
||||
msg = await show_plugin_repo()
|
||||
if isinstance(msg, int):
|
||||
await show_repo.finish("文件下载失败或解压失败..")
|
||||
await show_repo.send(image(b64=msg))
|
||||
logger.info(
|
||||
f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
||||
f" 查看插件仓库"
|
||||
)
|
||||
|
||||
183
basic_plugins/plugin_shop/data_source.py
Normal file
183
basic_plugins/plugin_shop/data_source.py
Normal file
@ -0,0 +1,183 @@
|
||||
import os
|
||||
import shutil
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from typing import Union, Tuple
|
||||
|
||||
import ujson as json
|
||||
|
||||
from services import logger
|
||||
from utils.http_utils import AsyncHttpx
|
||||
from configs.path_config import TEMP_PATH, DATA_PATH
|
||||
from utils.image_utils import text2image, BuildImage
|
||||
from utils.utils import is_number
|
||||
|
||||
path = DATA_PATH / "plugin_shop"
|
||||
if not path.exists():
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
repo_json_url = "https://github.com/zhenxun-org/nonebot_plugins_zhenxun_bot/archive/refs/heads/index.zip"
|
||||
repo_zip_path = TEMP_PATH / "plugin_repo_json.zip"
|
||||
plugin_json = path / "zhenxun_plugins.json"
|
||||
|
||||
extensive_plugin_path = Path() / "extensive_plugin"
|
||||
|
||||
path = DATA_PATH / "plugin_shop"
|
||||
if not path.exists():
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
if not extensive_plugin_path.exists():
|
||||
extensive_plugin_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
data = {}
|
||||
|
||||
|
||||
async def install_plugin(name: str) -> str:
|
||||
"""
|
||||
安装插件
|
||||
:param name: 插件名或下标
|
||||
"""
|
||||
try:
|
||||
if is_number(name):
|
||||
name, code = await get_plugin_name(int(name))
|
||||
if code != 200:
|
||||
return name
|
||||
plugin_url = data[name]["download_url"]
|
||||
url = (await AsyncHttpx.get(plugin_url)).headers.get("Location")
|
||||
zip_file = TEMP_PATH / f"{name}.zip"
|
||||
if zip_file.exists():
|
||||
zip_file.unlink()
|
||||
if await AsyncHttpx.download_file(url, zip_file):
|
||||
logger.debug("开始解压插件压缩包...")
|
||||
# 解压
|
||||
zf = zipfile.ZipFile(zip_file, "r")
|
||||
extract_path = TEMP_PATH / f"{name}"
|
||||
if extract_path.exists():
|
||||
shutil.rmtree(extract_path.absolute(), ignore_errors=True)
|
||||
extract_path.mkdir(exist_ok=True, parents=True)
|
||||
for file in zf.namelist():
|
||||
zf.extract(file, extract_path)
|
||||
zf.close()
|
||||
logger.debug("解压插件压缩包完成...")
|
||||
logger.debug("开始移动插件文件夹...")
|
||||
if (extensive_plugin_path / f"{name}").exists():
|
||||
logger.debug("extensive_plugin目录下文件夹已存在,删除该目录插件文件夹...")
|
||||
shutil.rmtree(
|
||||
(extensive_plugin_path / f"{name}").absolute(), ignore_errors=True
|
||||
)
|
||||
extract_path.rename(extensive_plugin_path / f"{name}")
|
||||
tmp = ""
|
||||
if "pyproject.toml" in os.listdir(extensive_plugin_path / f"{name}"):
|
||||
tmp = "检测到该插件含有额外依赖,当前安装无法保证依赖完全安装成功。"
|
||||
os.system(
|
||||
f"poetry run pip install -r {(extensive_plugin_path / f'{name}' / 'pyproject.toml').absolute()}"
|
||||
)
|
||||
logger.debug("移动插件文件夹完成...")
|
||||
logger.info(f"成功安装插件 {name} 成功!\n{tmp}")
|
||||
return f"成功安装插件 {name},请重启bot!"
|
||||
except Exception as e:
|
||||
logger.error(f"安装插件 {name} 失败 {type(e)}:{e}")
|
||||
return f"安装插件 {name} 失败 {type(e)}:{e}"
|
||||
|
||||
|
||||
async def uninstall_plugin(name: str) -> str:
|
||||
"""
|
||||
删除插件
|
||||
:param name: 插件名或下标
|
||||
"""
|
||||
try:
|
||||
if is_number(name):
|
||||
name, code = await get_plugin_name(int(name))
|
||||
if code != 200:
|
||||
return name
|
||||
if name not in os.listdir(extensive_plugin_path):
|
||||
return f"未安装 {name} 插件!"
|
||||
shutil.rmtree((extensive_plugin_path / name).absolute(), ignore_errors=True)
|
||||
logger.info(f"插件 {name} 删除成功!")
|
||||
return f"插件 {name} 删除成功!"
|
||||
except Exception as e:
|
||||
logger.error(f"删除插件 {name} 失败 {type(e)}:{e}")
|
||||
return f"删除插件 {name} 失败 {type(e)}:{e}"
|
||||
|
||||
|
||||
async def show_plugin_repo() -> Union[int, str]:
|
||||
"""
|
||||
获取插件仓库数据并格式化
|
||||
"""
|
||||
if not plugin_json.exists():
|
||||
code = await download_json()
|
||||
if code != 200:
|
||||
return code
|
||||
plugin_info = json.load(open(plugin_json, "r", encoding="utf8"))
|
||||
image_list = []
|
||||
w, h = 0, 0
|
||||
line_height = 10
|
||||
for i, key in enumerate(plugin_info.keys()):
|
||||
data[key] = {
|
||||
"名称": plugin_info[key]["plugin_name"],
|
||||
"模块": key,
|
||||
"作者": plugin_info[key]["author"],
|
||||
"版本": plugin_info[key]["version"],
|
||||
"简介": plugin_info[key]["introduction"],
|
||||
"download_url": plugin_info[key]["download_url"],
|
||||
"github_url": plugin_info[key]["github_url"],
|
||||
}
|
||||
s = (
|
||||
f'id:{i+1}\n名称:{plugin_info[key]["plugin_name"]}\n'
|
||||
f'模块:{key}\n'
|
||||
f'作者:{plugin_info[key]["author"]}\n'
|
||||
f'版本:{plugin_info[key]["version"]}\n'
|
||||
f'简介:{plugin_info[key]["introduction"]}\n'
|
||||
f"-------------------"
|
||||
)
|
||||
img = await text2image(s, font_size=20, color="#f9f6f2")
|
||||
w = w if w > img.w else img.w
|
||||
h += img.h + line_height
|
||||
image_list.append(img)
|
||||
A = BuildImage(w + 100, h + 100, color="#f9f6f2")
|
||||
cur_h = 50
|
||||
for img in image_list:
|
||||
await A.apaste(img, (50, cur_h))
|
||||
cur_h += img.h + line_height
|
||||
return A.pic2bs4()
|
||||
|
||||
|
||||
async def download_json() -> int:
|
||||
"""
|
||||
下载插件库json文件
|
||||
"""
|
||||
try:
|
||||
url = (await AsyncHttpx.get(repo_json_url)).headers.get("Location")
|
||||
if repo_zip_path.exists():
|
||||
repo_zip_path.unlink()
|
||||
if await AsyncHttpx.download_file(url, repo_zip_path):
|
||||
zf = zipfile.ZipFile(repo_zip_path, "r")
|
||||
extract_path = path / "temp"
|
||||
for file in zf.namelist():
|
||||
zf.extract(file, extract_path)
|
||||
zf.close()
|
||||
if plugin_json.exists():
|
||||
plugin_json.unlink()
|
||||
(
|
||||
extract_path
|
||||
/ "nonebot_plugins_zhenxun_bot-index"
|
||||
/ "zhenxun_plugins.json"
|
||||
).rename(plugin_json)
|
||||
shutil.rmtree(extract_path.absolute(), ignore_errors=True)
|
||||
return 200
|
||||
except Exception as e:
|
||||
logger.error(f"下载插件库压缩包失败或解压失败 {type(e)}:{e}")
|
||||
return 999
|
||||
|
||||
|
||||
async def get_plugin_name(name: int) -> Tuple[str, int]:
|
||||
"""
|
||||
通过下标获取插件名
|
||||
:param name: 下标
|
||||
"""
|
||||
name = int(name)
|
||||
if not data:
|
||||
await show_plugin_repo()
|
||||
if name < 1 or name > len(data.keys()):
|
||||
return "下标超过上下限!", 999
|
||||
name = list(data.keys())[name - 1]
|
||||
return name, 200
|
||||
1
bot.py
1
bot.py
@ -12,6 +12,7 @@ driver.on_shutdown(disconnect)
|
||||
nonebot.load_plugin("nonebot_plugin_apscheduler")
|
||||
nonebot.load_plugins("basic_plugins")
|
||||
nonebot.load_plugins("plugins")
|
||||
nonebot.load_plugins("extensive_plugin")
|
||||
# 最后加载权限控制
|
||||
nonebot.load_plugins("basic_plugins/hooks")
|
||||
|
||||
|
||||
@ -9,7 +9,8 @@
|
||||
"configs/utils",
|
||||
"poetry.lock",
|
||||
"pyproject.toml",
|
||||
"resources/font"
|
||||
"resources/font",
|
||||
"bot.py"
|
||||
],
|
||||
"add_file": [],
|
||||
"delete_file": []
|
||||
|
||||
@ -1508,7 +1508,7 @@ async def text2image(
|
||||
font=font,
|
||||
)
|
||||
await A.atext((left_padding, top_padding), text, font_color)
|
||||
# A.show()
|
||||
# A.show()
|
||||
return A
|
||||
|
||||
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
from typing import Union, List, Optional
|
||||
|
||||
from configs.path_config import IMAGE_PATH, DATA_PATH, RECORD_PATH, TEXT_PATH, FONT_PATH, LOG_PATH
|
||||
from .data_class import StaticData
|
||||
from pathlib import Path
|
||||
from ruamel.yaml import YAML
|
||||
@ -47,19 +49,21 @@ class ResourcesManager(StaticData):
|
||||
if module in self._data.keys():
|
||||
for x in self._data[module].keys():
|
||||
move_file = Path(self._data[module][x])
|
||||
if move_file.exists():
|
||||
shutil.rmtree(move_file.absolute(), ignore_errors=True)
|
||||
logger.info(f"已清除插件 {module} 资源路径:{self._data[module][x]}")
|
||||
del self._data[module][x]
|
||||
if move_file not in [IMAGE_PATH, DATA_PATH, RECORD_PATH, TEXT_PATH, FONT_PATH, LOG_PATH]:
|
||||
if move_file.exists():
|
||||
shutil.rmtree(move_file.absolute(), ignore_errors=True)
|
||||
logger.info(f"已清除插件 {module} 资源路径:{self._data[module][x]}")
|
||||
del self._data[module][x]
|
||||
else:
|
||||
if isinstance(source_file, Path):
|
||||
source_file = str(source_file.absolute())
|
||||
if source_file:
|
||||
if module in self._data.keys() and source_file in self._data[module].keys():
|
||||
move_file = Path(self._data[module][source_file])
|
||||
if move_file.exists():
|
||||
shutil.rmtree(move_file.absolute(), ignore_errors=True)
|
||||
del self._data[module][source_file]
|
||||
if move_file not in [IMAGE_PATH, DATA_PATH, RECORD_PATH, TEXT_PATH, FONT_PATH, LOG_PATH]:
|
||||
if move_file.exists():
|
||||
shutil.rmtree(move_file.absolute(), ignore_errors=True)
|
||||
del self._data[module][source_file]
|
||||
self.save()
|
||||
|
||||
def start_move(self):
|
||||
@ -97,7 +101,6 @@ class ResourcesManager(StaticData):
|
||||
"""
|
||||
添加临时清理文件夹
|
||||
:param path: 路径
|
||||
:param recursive: 是否将该目录下的所有目录也添加为临时文件夹
|
||||
"""
|
||||
if isinstance(path, str):
|
||||
path = Path(path)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user