更新插件商店 (#1573)

This commit is contained in:
xuanerwa 2024-08-26 10:39:33 +08:00 committed by GitHub
parent 65a6f608e6
commit 66e6f449cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 107 additions and 29 deletions

View File

@ -7,6 +7,12 @@ BASE_PATH.mkdir(parents=True, exist_ok=True)
CONFIG_URL = "https://cdn.jsdelivr.net/gh/zhenxun-org/zhenxun_bot_plugins/plugins.json" CONFIG_URL = "https://cdn.jsdelivr.net/gh/zhenxun-org/zhenxun_bot_plugins/plugins.json"
"""插件信息文件""" """插件信息文件"""
CONFIG_INDEX_URL = "https://raw.githubusercontent.com/zhenxun-org/zhenxun_bot_plugins_index/index/plugins.json"
"""插件索引库信息文件"""
CONFIG_INDEX_CDN_URL = "https://cdn.jsdelivr.net/gh/zhenxun-org/zhenxun_bot_plugins_index@index/plugins.json"
"""插件索引库信息文件cdn"""
DOWNLOAD_URL = ( DOWNLOAD_URL = (
"https://api.github.com/repos/zhenxun-org/zhenxun_bot_plugins/contents/{}?ref=main" "https://api.github.com/repos/zhenxun-org/zhenxun_bot_plugins/contents/{}?ref=main"
) )

View File

@ -1,3 +1,4 @@
import re
import shutil import shutil
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@ -9,7 +10,7 @@ from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
from .config import BASE_PATH, CONFIG_URL, DOWNLOAD_URL from .config import BASE_PATH, CONFIG_URL, CONFIG_INDEX_URL, CONFIG_INDEX_CDN_URL, DOWNLOAD_URL
def row_style(column: str, text: str) -> RowStyle: def row_style(column: str, text: str) -> RowStyle:
@ -30,7 +31,7 @@ def row_style(column: str, text: str) -> RowStyle:
async def recurrence_get_url( async def recurrence_get_url(
url: str, data_list: list[tuple[str, str]], ignore_list: list[str] = [] url: str, data_list: list[tuple[str, str]], ignore_list: list[str] = [], api_url: str = None
): ):
"""递归获取目录下所有文件 """递归获取目录下所有文件
@ -53,48 +54,54 @@ async def recurrence_get_url(
data_list.append((json_data.get("download_url"), json_data["path"])) data_list.append((json_data.get("download_url"), json_data["path"]))
for download_url, path in data_list: for download_url, path in data_list:
if not download_url: if not download_url:
_url = DOWNLOAD_URL.format(path) _url = api_url + path if api_url else DOWNLOAD_URL.format(path)
if _url not in ignore_list: if _url not in ignore_list:
ignore_list.append(_url) ignore_list.append(_url)
await recurrence_get_url(_url, data_list, ignore_list) await recurrence_get_url(_url, data_list, ignore_list, api_url)
async def download_file(url: str): async def download_file(url: str, _is: bool = False, api_url: str = None):
"""下载文件 """下载文件
参数: 参数:
url: 插件详情url url: 插件详情url
_is: 是否为第三方插件
url_start : 第三方插件url
异常: 异常:
ValueError: 下载失败 ValueError: 下载失败
""" """
data_list = [] data_list = []
await recurrence_get_url(url, data_list) await recurrence_get_url(url, data_list, api_url=api_url)
for download_url, path in data_list: for download_url, path in data_list:
if download_url and "." in path: if download_url and "." in path:
logger.debug(f"下载文件: {path}", "插件管理") logger.debug(f"下载文件: {path}", "插件管理")
file = Path(f"zhenxun/{path}") base_path = "zhenxun/plugins/" if _is else "zhenxun/"
file = Path(f"{base_path}{path}")
file.parent.mkdir(parents=True, exist_ok=True) file.parent.mkdir(parents=True, exist_ok=True)
print(download_url)
r = await AsyncHttpx.get(download_url) r = await AsyncHttpx.get(download_url)
if r.status_code != 200: if r.status_code != 200:
raise ValueError(f"文件下载错误, code: {r.status_code}") raise ValueError(f"文件下载错误, code: {r.status_code}")
content = r.text.replace("\r\n", "\n") # 统一换行符为 UNIX 风格
with open(file, "w", encoding="utf8") as f: with open(file, "w", encoding="utf8") as f:
logger.debug(f"写入文件: {file}", "插件管理") logger.debug(f"写入文件: {file}", "插件管理")
f.write(r.text) f.write(content)
def install_requirement(plugin_path: Path): def install_requirement(plugin_path: Path):
requirement_path = plugin_path / "requirement.txt" requirement_files = ["requirement.txt", "requirements.txt"]
requirement_paths = [plugin_path / file for file in requirement_files]
if not requirement_path.exists(): existing_requirements = next((path for path in requirement_paths if path.exists()), None)
logger.debug(
f"No requirement.txt found for plugin: {plugin_path.name}", "插件管理" if not existing_requirements:
) logger.debug(f"No requirement.txt found for plugin: {plugin_path.name}", "插件管理")
return return
try: try:
result = subprocess.run( result = subprocess.run(
["pip", "install", "-r", str(requirement_path)], ["pip", "install", "-r", str(existing_requirements)],
check=True, check=True,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
@ -111,7 +118,6 @@ def install_requirement(plugin_path: Path):
class ShopManage: class ShopManage:
type2name = { type2name = {
"NORMAL": "普通插件", "NORMAL": "普通插件",
"ADMIN": "管理员插件", "ADMIN": "管理员插件",
@ -132,9 +138,20 @@ class ShopManage:
dict: 插件信息数据 dict: 插件信息数据
""" """
res = await AsyncHttpx.get(CONFIG_URL) res = await AsyncHttpx.get(CONFIG_URL)
if res.status_code != 200: res2 = await AsyncHttpx.get(CONFIG_INDEX_URL)
raise ValueError(f"下载错误, code: {res.status_code}")
return json.loads(res.text) if res2.status_code != 200:
logger.info("访问第三方插件信息文件失败改为进行cdn访问")
res2 = await AsyncHttpx.get(CONFIG_INDEX_CDN_URL)
# 检查请求结果
if res.status_code != 200 or res2.status_code != 200:
raise ValueError(f"下载错误, code: {res.status_code}, {res2.status_code}")
# 解析并合并返回的 JSON 数据
data1 = json.loads(res.text)
data2 = json.loads(res2.text)
return {**data1, **data2}
@classmethod @classmethod
def version_check(cls, plugin_info: dict, suc_plugin: dict[str, str]): def version_check(cls, plugin_info: dict, suc_plugin: dict[str, str]):
@ -219,16 +236,43 @@ class ShopManage:
plugin_info = data[plugin_key] plugin_info = data[plugin_key]
module_path_split = plugin_info["module_path"].split(".") module_path_split = plugin_info["module_path"].split(".")
url_path = cls.get_url_path(plugin_info["module_path"], plugin_info["is_dir"]) url_path = cls.get_url_path(plugin_info["module_path"], plugin_info["is_dir"])
if not url_path: if not url_path and plugin_info["module_path"]:
return "插件下载地址构建失败..." return "插件下载地址构建失败..."
logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理") logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理")
await download_file(DOWNLOAD_URL.format(url_path)) github_url = plugin_info.get("github_url")
if github_url:
github_path = re.search(r"github\.com/([^/]+/[^/]+)", github_url).group(1)
api_url = f"https://api.github.com/repos/{github_path}/contents/"
download_url = f"{api_url}{url_path}?ref=main"
else:
download_url = DOWNLOAD_URL.format(url_path)
api_url = None
await download_file(download_url, bool(github_url), api_url)
# 安装依赖 # 安装依赖
plugin_path = BASE_PATH / "/".join(module_path_split) plugin_path = BASE_PATH / "/".join(module_path_split)
if url_path and github_url:
plugin_path = BASE_PATH / "plugins" / "/".join(module_path_split)
res = await AsyncHttpx.get(api_url)
if res.status_code != 200:
return f"访问错误, code: {res.status_code}"
json_data = res.json()
requirement_file = next(
(v for v in json_data if v["name"] in ["requirements.txt", "requirement.txt"]), None
)
if requirement_file:
r = await AsyncHttpx.get(requirement_file.get("download_url"))
if r.status_code != 200:
raise ValueError(f"文件下载错误, code: {r.status_code}")
requirement_path = plugin_path / requirement_file["name"]
with open(requirement_path, "w", encoding="utf8") as f:
logger.debug(f"写入文件: {requirement_path}", "插件管理")
f.write(r.text)
install_requirement(plugin_path) install_requirement(plugin_path)
return f"插件 {plugin_key} 安装成功!" return f"插件 {plugin_key} 安装成功! 重启后生效"
@classmethod @classmethod
async def remove_plugin(cls, plugin_id: int) -> str: async def remove_plugin(cls, plugin_id: int) -> str:
@ -246,6 +290,9 @@ class ShopManage:
plugin_key = list(data.keys())[plugin_id] plugin_key = list(data.keys())[plugin_id]
plugin_info = data[plugin_key] plugin_info = data[plugin_key]
path = BASE_PATH path = BASE_PATH
github_url = plugin_info.get("github_url")
if github_url:
path = BASE_PATH / 'plugins'
for p in plugin_info["module_path"].split("."): for p in plugin_info["module_path"].split("."):
path = path / p path = path / p
if not plugin_info["is_dir"]: if not plugin_info["is_dir"]:
@ -257,7 +304,7 @@ class ShopManage:
shutil.rmtree(path) shutil.rmtree(path)
else: else:
path.unlink() path.unlink()
return f"插件 {plugin_key} 移除成功!" return f"插件 {plugin_key} 移除成功! 重启后生效"
@classmethod @classmethod
async def search_plugin(cls, plugin_name_or_author: str) -> BuildImage | str: async def search_plugin(cls, plugin_name_or_author: str) -> BuildImage | str:
@ -323,16 +370,41 @@ class ShopManage:
plugin_key = list(data.keys())[plugin_id] plugin_key = list(data.keys())[plugin_id]
plugin_info = data[plugin_key] plugin_info = data[plugin_key]
module_path_split = plugin_info["module_path"].split(".") module_path_split = plugin_info["module_path"].split(".")
url_path = url_path = cls.get_url_path( url_path = cls.get_url_path(plugin_info["module_path"], plugin_info["is_dir"])
plugin_info["module_path"], plugin_info["is_dir"] if not url_path and plugin_info["module_path"]:
)
if not url_path:
return "插件下载地址构建失败..." return "插件下载地址构建失败..."
logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理") logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理")
await download_file(DOWNLOAD_URL.format(url_path)) github_url = plugin_info.get("github_url")
if github_url:
github_path = re.search(r"github\.com/([^/]+/[^/]+)", github_url).group(1)
api_url = f"https://api.github.com/repos/{github_path}/contents/"
download_url = f"{api_url}{url_path}?ref=main"
else:
download_url = DOWNLOAD_URL.format(url_path)
api_url = None
await download_file(download_url, bool(github_url), api_url)
# 安装依赖 # 安装依赖
plugin_path = BASE_PATH / "/".join(module_path_split) plugin_path = BASE_PATH / "/".join(module_path_split)
if url_path and github_url:
plugin_path = BASE_PATH / "plugins" / "/".join(module_path_split)
res = await AsyncHttpx.get(api_url)
if res.status_code != 200:
return f"访问错误, code: {res.status_code}"
json_data = res.json()
requirement_file = next(
(v for v in json_data if v["name"] in ["requirements.txt", "requirement.txt"]), None
)
if requirement_file:
r = await AsyncHttpx.get(requirement_file.get("download_url"))
if r.status_code != 200:
raise ValueError(f"文件下载错误, code: {r.status_code}")
requirement_path = plugin_path / requirement_file["name"]
with open(requirement_path, "w", encoding="utf8") as f:
logger.debug(f"写入文件: {requirement_path}", "插件管理")
f.write(r.text)
install_requirement(plugin_path) install_requirement(plugin_path)
return f"插件 {plugin_key} 更新成功!" return f"插件 {plugin_key} 更新成功! 重启后生效"