🔨 提取GitHub相关操作 (#1609)

* 🔨 提取GitHub相关操作

* 🔨 重构API策略
This commit is contained in:
AkashiCoin 2024-09-08 09:38:55 +08:00
parent 342e70cc38
commit f11e9c58e4
15 changed files with 425 additions and 326 deletions

10
.vscode/extensions.json vendored Normal file
View File

@ -0,0 +1,10 @@
{
"recommendations": [
"charliermarsh.ruff",
"esbenp.prettier-vscode",
"ms-python.black-formatter",
"ms-python.isort",
"ms-python.python",
"ms-python.vscode-pylance"
]
}

View File

@ -26,6 +26,20 @@ def init_mocked_api(mocked_api: MockRouter) -> None:
url="https://api.github.com/repos/HibiKier/zhenxun_bot/releases/latest",
name="release_latest",
).respond(json=get_response_json("release_latest.json"))
mocked_api.head(
url="https://raw.githubusercontent.com/",
name="head_raw",
).respond(text="")
mocked_api.head(
url="https://github.com/",
name="head_github",
).respond(text="")
mocked_api.head(
url="https://codeload.github.com/",
name="head_codeload",
).respond(text="")
mocked_api.get(
url="https://raw.githubusercontent.com/HibiKier/zhenxun_bot/dev/__version__",
name="dev_branch_version",
@ -75,13 +89,13 @@ def init_mocked_api(mocked_api: MockRouter) -> None:
content=tar_buffer.getvalue(),
)
mocked_api.get(
url="https://ghproxy.cc/https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/dev.zip",
url="https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/dev.zip",
name="dev_download_url",
).respond(
content=zip_bytes.getvalue(),
)
mocked_api.get(
url="https://ghproxy.cc/https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/main.zip",
url="https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/main.zip",
name="main_download_url",
).respond(
content=zip_bytes.getvalue(),
@ -306,7 +320,6 @@ async def test_check_update_release(
)
ctx.should_finished(_matcher)
assert mocked_api["release_latest"].called
assert mocked_api["release_download_url"].called
assert mocked_api["release_download_url_redirect"].called
assert (mock_backup_path / PYPROJECT_FILE_STRING).exists()

View File

@ -10,10 +10,10 @@ from nonebot.utils import run_sync
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.github_utils.models import RepoInfo
from zhenxun.utils.github_utils import parse_github_url
from .config import (
DEV_URL,
MAIN_URL,
TMP_PATH,
BASE_PATH,
BACKUP_PATH,
@ -25,6 +25,7 @@ from .config import (
BASE_PATH_STRING,
DOWNLOAD_GZ_FILE,
DOWNLOAD_ZIP_FILE,
DEFAULT_GITHUB_URL,
PYPROJECT_LOCK_FILE,
REQ_TXT_FILE_STRING,
PYPROJECT_FILE_STRING,
@ -169,23 +170,19 @@ class UpdateManage:
cur_version = cls.__get_version()
url = None
new_version = None
if version_type == "dev":
url = DEV_URL
new_version = await cls.__get_version_from_branch("dev")
if new_version:
new_version = new_version.split(":")[-1].strip()
elif version_type == "main":
url = MAIN_URL
new_version = await cls.__get_version_from_branch("main")
repo_info = parse_github_url(DEFAULT_GITHUB_URL)
if version_type in {"dev", "main"}:
repo_info.branch = version_type
new_version = await cls.__get_version_from_repo(repo_info)
if new_version:
new_version = new_version.split(":")[-1].strip()
url = await repo_info.get_archive_download_url()
elif version_type == "release":
data = await cls.__get_latest_data()
if not data:
return "获取更新版本失败..."
url = data.get("tarball_url")
new_version = data.get("name")
url = (await AsyncHttpx.get(url)).headers.get("Location") # type: ignore
new_version = data.get("name", "")
url = await repo_info.get_release_source_download_url_tgz(new_version)
if not url:
return "获取版本下载链接失败..."
if TMP_PATH.exists():
@ -247,7 +244,7 @@ class UpdateManage:
return {}
@classmethod
async def __get_version_from_branch(cls, branch: str) -> str:
async def __get_version_from_repo(cls, repo_info: RepoInfo) -> str:
"""从指定分支获取版本号
参数:
@ -256,11 +253,11 @@ class UpdateManage:
返回:
str: 版本号
"""
version_url = f"https://raw.githubusercontent.com/HibiKier/zhenxun_bot/{branch}/__version__"
version_url = await repo_info.get_raw_download_url(path="__version__")
try:
res = await AsyncHttpx.get(version_url)
if res.status_code == 200:
return res.text.strip()
except Exception as e:
logger.error(f"获取 {branch} 分支版本失败", e=e)
logger.error(f"获取 {repo_info.branch} 分支版本失败", e=e)
return "未知版本"

View File

@ -2,8 +2,7 @@ from pathlib import Path
from zhenxun.configs.path_config import TEMP_PATH
DEV_URL = "https://ghproxy.cc/https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/dev.zip"
MAIN_URL = "https://ghproxy.cc/https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/main.zip"
DEFAULT_GITHUB_URL = "https://github.com/HibiKier/zhenxun_bot/tree/main"
RELEASE_URL = "https://api.github.com/repos/HibiKier/zhenxun_bot/releases/latest"
VERSION_FILE_STRING = "__version__"
@ -23,8 +22,10 @@ TMP_PATH = TEMP_PATH / "auto_update"
BACKUP_PATH = Path() / "backup"
DOWNLOAD_GZ_FILE = TMP_PATH / "download_latest_file.tar.gz"
DOWNLOAD_ZIP_FILE = TMP_PATH / "download_latest_file.zip"
DOWNLOAD_GZ_FILE_STRING = "download_latest_file.tar.gz"
DOWNLOAD_ZIP_FILE_STRING = "download_latest_file.zip"
DOWNLOAD_GZ_FILE = TMP_PATH / DOWNLOAD_GZ_FILE_STRING
DOWNLOAD_ZIP_FILE = TMP_PATH / DOWNLOAD_ZIP_FILE_STRING
REPLACE_FOLDERS = [
"builtin_plugins",

View File

@ -1,4 +1,3 @@
import re
from pathlib import Path
BASE_PATH = Path() / "zhenxun"
@ -10,21 +9,3 @@ DEFAULT_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins/tree/ma
EXTRA_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins_index/tree/index"
"""插件库索引github仓库地址"""
GITHUB_REPO_URL_PATTERN = re.compile(
r"^https://github.com/(?P<owner>[^/]+)/(?P<repo>[^/]+)(/tree/(?P<branch>[^/]+))?$"
)
"""github仓库地址正则"""
JSD_PACKAGE_API_FORMAT = (
"https://data.jsdelivr.com/v1/packages/gh/{owner}/{repo}@{branch}"
)
"""jsdelivr包地址格式"""
GIT_API_TREES_FORMAT = (
"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1"
)
"""git api trees地址格式"""
CACHED_API_TTL = 300
"""缓存api ttl"""

View File

@ -8,14 +8,11 @@ from aiocache import cached
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.github_utils.models import RepoAPI
from zhenxun.utils.github_utils import api_strategy, parse_github_url
from zhenxun.builtin_plugins.plugin_store.models import StorePluginInfo
from zhenxun.utils.image_utils import RowStyle, BuildImage, ImageTemplate
from zhenxun.builtin_plugins.auto_update.config import REQ_TXT_FILE_STRING
from zhenxun.builtin_plugins.plugin_store.models import (
BaseAPI,
RepoInfo,
PackageApi,
StorePluginInfo,
)
from .config import BASE_PATH, EXTRA_GITHUB_URL, DEFAULT_GITHUB_URL
@ -81,12 +78,12 @@ class ShopManage:
返回:
dict: 插件信息数据
"""
default_github_url = await RepoInfo.parse_github_url(
default_github_url = await parse_github_url(
DEFAULT_GITHUB_URL
).get_download_url_with_path("plugins.json")
extra_github_url = await RepoInfo.parse_github_url(
).get_raw_download_url("plugins.json")
extra_github_url = await parse_github_url(
EXTRA_GITHUB_URL
).get_download_url_with_path("plugins.json")
).get_raw_download_url("plugins.json")
res = await AsyncHttpx.get(default_github_url)
res2 = await AsyncHttpx.get(extra_github_url)
@ -211,29 +208,26 @@ class ShopManage:
async def install_plugin_with_repo(
cls, github_url: str, module_path: str, is_dir: bool, is_external: bool = False
):
package_api: PackageApi
files: list[str]
package_info: BaseAPI
repo_info = RepoInfo.parse_github_url(github_url)
repo_api: RepoAPI
repo_info = parse_github_url(github_url)
logger.debug(f"成功获取仓库信息: {repo_info}", "插件管理")
for package_api in PackageApi:
for repo_api in api_strategy:
try:
package_info = await package_api.value.parse_repo_info(repo_info)
await repo_api.parse_repo_info(repo_info)
break
except Exception as e:
logger.warning(
f"获取插件文件失败: {e} | API类型: {package_api.value}", "插件管理"
f"获取插件文件失败: {e} | API类型: {repo_api.strategy}", "插件管理"
)
continue
else:
raise ValueError("所有API获取插件文件失败请检查网络连接")
files = package_info.get_files(
files = repo_api.get_files(
module_path=module_path.replace(".", "/") + ("" if is_dir else ".py"),
is_dir=is_dir,
)
download_urls = [
await repo_info.get_download_url_with_path(file) for file in files
]
download_urls = [await repo_info.get_raw_download_url(file) for file in files]
base_path = BASE_PATH / "plugins" if is_external else BASE_PATH
download_paths: list[Path | str] = [base_path / file for file in files]
logger.debug(f"插件下载路径: {download_paths}", "插件管理")
@ -244,11 +238,11 @@ class ShopManage:
else:
# 安装依赖
plugin_path = base_path / "/".join(module_path.split("."))
req_files = package_info.get_files(REQ_TXT_FILE_STRING, False)
req_files.extend(package_info.get_files("requirement.txt", False))
req_files = repo_api.get_files(REQ_TXT_FILE_STRING, False)
req_files.extend(repo_api.get_files("requirement.txt", False))
logger.debug(f"获取插件依赖文件列表: {req_files}", "插件管理")
req_download_urls = [
await repo_info.get_download_url_with_path(file) for file in req_files
await repo_info.get_raw_download_url(file) for file in req_files
]
req_paths: list[Path | str] = [plugin_path / file for file in req_files]
logger.debug(f"插件依赖文件下载路径: {req_paths}", "插件管理")

View File

@ -1,19 +1,6 @@
from enum import Enum
from abc import ABC, abstractmethod
from aiocache import cached
from strenum import StrEnum
from pydantic import BaseModel
from zhenxun.utils.enum import PluginType
from zhenxun.utils.http_utils import AsyncHttpx
from .config import (
CACHED_API_TTL,
GIT_API_TREES_FORMAT,
JSD_PACKAGE_API_FORMAT,
GITHUB_REPO_URL_PATTERN,
)
type2name: dict[str, str] = {
"NORMAL": "普通插件",
@ -41,217 +28,3 @@ class StorePluginInfo(BaseModel):
@property
def plugin_type_name(self):
return type2name[self.plugin_type.value]
class RepoInfo(BaseModel):
"""仓库信息"""
owner: str
repo: str
branch: str = "main"
async def get_download_url_with_path(self, path: str):
url_format = await self.get_fastest_format()
return url_format.format(**self.dict(), path=path)
@classmethod
def parse_github_url(cls, github_url: str) -> "RepoInfo":
if matched := GITHUB_REPO_URL_PATTERN.match(github_url):
return RepoInfo(**{k: v for k, v in matched.groupdict().items() if v})
raise ValueError("github地址格式错误")
@classmethod
@cached()
async def get_fastest_format(cls) -> str:
return await cls._get_fastest_format()
@classmethod
async def _get_fastest_format(cls) -> str:
"""获取最快下载地址格式"""
raw_format = "https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}"
patterns: dict[str, str] = {
"https://raw.githubusercontent.com/": raw_format,
"https://ghproxy.cc/": f"https://ghproxy.cc/{raw_format}",
"https://mirror.ghproxy.com/": f"https://mirror.ghproxy.com/{raw_format}",
"https://gh-proxy.com/": f"https://gh-proxy.com/{raw_format}",
"https://cdn.jsdelivr.net/": "https://cdn.jsdelivr.net/gh/{owner}/{repo}@{branch}/{path}",
}
sorted_urls = await AsyncHttpx.get_fastest_mirror(list(patterns.keys()))
if not sorted_urls:
raise Exception("无法获取任意GitHub资源加速地址请检查网络")
return patterns[sorted_urls[0]]
class FileType(StrEnum):
"""文件类型"""
FILE = "file"
DIR = "directory"
PACKAGE = "gh"
class BaseAPI(BaseModel, ABC):
"""基础接口"""
@classmethod
@abstractmethod
@cached(ttl=CACHED_API_TTL)
async def parse_repo_info(cls, repo_info: RepoInfo) -> "BaseAPI": ...
@abstractmethod
def get_files(cls, module_path: str, is_dir) -> list[str]: ...
class JsdelivrAPI(BaseAPI):
"""jsdelivr接口"""
type: FileType
name: str
files: list["JsdelivrAPI"] = []
def recurrence_files(self, dir_path: str, is_dir: bool = True) -> list[str]:
"""
递归获取文件路径
参数:
files: 文件列表
dir_path: 目录路径
is_dir: 是否为目录
返回:
list[str]: 文件路径
"""
if not is_dir and dir_path.endswith(self.name):
return [dir_path]
if self.files is None:
raise ValueError("文件列表为空")
paths = []
for file in self.files:
if is_dir and file.type == FileType.DIR and file.files:
paths.extend(self.recurrence_files(f"{dir_path}/{file.name}", is_dir))
elif file.type == FileType.FILE:
if is_dir:
paths.append(f"{dir_path}/{file.name}")
elif dir_path.endswith(file.name):
paths.append(dir_path)
return paths
def full_files_path(self, module_path: str, is_dir: bool = True) -> "JsdelivrAPI":
"""
获取文件路径
参数:
module_path: 模块路径
is_dir: 是否为目录
返回:
list[FileInfo]: 文件路径
"""
paths: list[str] = module_path.split("/")
if not is_dir:
paths = paths[:-1]
cur_file: JsdelivrAPI = self
for path in paths:
for file in cur_file.files:
if file.type == FileType.DIR and file.name == path and file.files:
cur_file = file
break
else:
raise ValueError(f"模块路径 {module_path} 不存在")
return cur_file
@classmethod
@cached(ttl=CACHED_API_TTL)
async def parse_repo_info(cls, repo_info: RepoInfo) -> "JsdelivrAPI":
"""解析仓库信息"""
"""获取插件包信息
参数:
repo_info: 仓库信息
返回:
FileInfo: 插件包信息
"""
jsd_package_url: str = JSD_PACKAGE_API_FORMAT.format(
owner=repo_info.owner, repo=repo_info.repo, branch=repo_info.branch
)
res = await AsyncHttpx.get(url=jsd_package_url)
if res.status_code != 200:
raise ValueError(f"下载错误, code: {res.status_code}")
return JsdelivrAPI(**res.json())
def get_files(self, module_path: str, is_dir: bool = True) -> list[str]:
"""获取文件路径"""
file = self.full_files_path(module_path, is_dir)
files = file.recurrence_files(
module_path,
is_dir,
)
return files
class TreeType(StrEnum):
"""树类型"""
FILE = "blob"
DIR = "tree"
class Tree(BaseModel):
""""""
path: str
mode: str
type: TreeType
sha: str
size: int | None
url: str
class GitHubAPI(BaseAPI):
"""github接口"""
sha: str
url: str
tree: list[Tree]
def export_files(self, module_path: str) -> list[str]:
"""导出文件路径"""
return [
file.path
for file in self.tree
if file.type == TreeType.FILE and file.path.startswith(module_path)
]
@classmethod
@cached(ttl=CACHED_API_TTL)
async def parse_repo_info(cls, repo_info: RepoInfo) -> "GitHubAPI":
"""获取仓库树
参数:
repo_info: 仓库信息
返回:
TreesInfo: 仓库树信息
"""
git_tree_url: str = GIT_API_TREES_FORMAT.format(
owner=repo_info.owner, repo=repo_info.repo, branch=repo_info.branch
)
res = await AsyncHttpx.get(url=git_tree_url)
if res.status_code != 200:
raise ValueError(f"下载错误, code: {res.status_code}")
return GitHubAPI(**res.json())
def get_files(self, module_path: str, is_dir: bool = True) -> list[str]:
"""获取文件路径"""
return self.export_files(module_path)
class PackageApi(Enum):
"""插件包接口"""
GITHUB = GitHubAPI
JSDELIVR = JsdelivrAPI

View File

@ -1,7 +1,17 @@
import nonebot
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from strenum import StrEnum
from fastapi.middleware.cors import CORSMiddleware
from zhenxun.configs.path_config import DATA_PATH, TEMP_PATH
WEBUI_STRING = "web_ui"
PUBLIC_STRING = "public"
WEBUI_DATA_PATH = DATA_PATH / WEBUI_STRING
PUBLIC_PATH = WEBUI_DATA_PATH / PUBLIC_STRING
TMP_PATH = TEMP_PATH / WEBUI_STRING
WEBUI_DIST_GITHUB_URL = "https://github.com/HibiKier/zhenxun_bot_webui/tree/dist"
app = nonebot.get_app()

View File

@ -1,11 +1,11 @@
from fastapi import APIRouter, FastAPI
from fastapi import FastAPI, APIRouter
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from zhenxun.services.log import logger
from .config import PUBLIC_PATH
from .data_source import update_webui_assets
from ..config import PUBLIC_PATH
from .data_source import COMMAND_NAME, update_webui_assets
router = APIRouter()
@ -23,13 +23,16 @@ async def favicon():
async def init_public(app: FastAPI):
try:
if not PUBLIC_PATH.exists():
await update_webui_assets()
folders = await update_webui_assets()
else:
folders = [x.name for x in PUBLIC_PATH.iterdir()]
app.include_router(router)
for pathname in ["css", "js", "fonts", "img"]:
for pathname in folders:
logger.debug(f"挂载文件夹: {pathname}")
app.mount(
f"/{pathname}",
StaticFiles(directory=PUBLIC_PATH / pathname, check_dir=True),
name=f"public_{pathname}",
)
except Exception as e:
logger.error(f"初始化 web ui assets 失败", "Web UI assets", e=e)
logger.error("初始化 WebUI资源 失败", COMMAND_NAME, e=e)

View File

@ -1,20 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from zhenxun.configs.path_config import DATA_PATH, TEMP_PATH
class PublicData(BaseModel):
etag: str
update_time: datetime
COMMAND_NAME = "webui_update_assets"
WEBUI_DATA_PATH = DATA_PATH / "web_ui"
PUBLIC_PATH = WEBUI_DATA_PATH / "public"
TMP_PATH = TEMP_PATH / "web_ui"
GITHUB_API_COMMITS = "https://api.github.com/repos/HibiKier/zhenxun_bot_webui/commits"
WEBUI_ASSETS_DOWNLOAD_URL = (
"https://github.com/HibiKier/zhenxun_bot_webui/archive/refs/heads/dist.zip"
)

View File

@ -1,4 +1,3 @@
import os
import shutil
import zipfile
from pathlib import Path
@ -7,14 +6,20 @@ from nonebot.utils import run_sync
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.github_utils import parse_github_url
from .config import COMMAND_NAME, PUBLIC_PATH, TMP_PATH, WEBUI_ASSETS_DOWNLOAD_URL
from ..config import TMP_PATH, PUBLIC_PATH, WEBUI_DIST_GITHUB_URL
COMMAND_NAME = "WebUI资源管理"
async def update_webui_assets():
webui_assets_path = TMP_PATH / "webui_assets.zip"
download_url = await parse_github_url(
WEBUI_DIST_GITHUB_URL
).get_archive_download_url()
if await AsyncHttpx.download_file(
WEBUI_ASSETS_DOWNLOAD_URL, webui_assets_path, follow_redirects=True
download_url, webui_assets_path, follow_redirects=True
):
logger.info("下载 webui_assets 成功...", COMMAND_NAME)
return await _file_handle(webui_assets_path)
@ -30,10 +35,9 @@ def _file_handle(webui_assets_path: Path):
logger.debug("解压 webui_assets 成功...", COMMAND_NAME)
else:
raise Exception("解压 webui_assets 失败,文件不存在...", COMMAND_NAME)
download_file_path = (
TMP_PATH / [x for x in os.listdir(TMP_PATH) if (TMP_PATH / x).is_dir()][0]
)
download_file_path = TMP_PATH / next(iter(TMP_PATH.iterdir()))
shutil.rmtree(PUBLIC_PATH, ignore_errors=True)
shutil.copytree(download_file_path / "dist", PUBLIC_PATH, dirs_exist_ok=True)
logger.debug("复制 webui_assets 成功...", COMMAND_NAME)
shutil.rmtree(TMP_PATH, ignore_errors=True)
return [x.name for x in PUBLIC_PATH.iterdir()]

View File

@ -0,0 +1,23 @@
from .consts import GITHUB_REPO_URL_PATTERN
from .func import get_fastest_raw_format, get_fastest_archive_format
from .models import RepoAPI, RepoInfo, GitHubStrategy, JsdelivrStrategy
__all__ = [
"parse_github_url",
"get_fastest_raw_format",
"get_fastest_archive_format",
"api_strategy",
]
def parse_github_url(github_url: str) -> "RepoInfo":
if matched := GITHUB_REPO_URL_PATTERN.match(github_url):
return RepoInfo(**{k: v for k, v in matched.groupdict().items() if v})
raise ValueError("github地址格式错误")
# 使用
jsdelivr_api = RepoAPI(JsdelivrStrategy()) # type: ignore
github_api = RepoAPI(GitHubStrategy()) # type: ignore
api_strategy = [jsdelivr_api, github_api]

View File

@ -0,0 +1,35 @@
import re
GITHUB_REPO_URL_PATTERN = re.compile(
r"^https://github.com/(?P<owner>[^/]+)/(?P<repo>[^/]+)(/tree/(?P<branch>[^/]+))?$"
)
"""github仓库地址正则"""
JSD_PACKAGE_API_FORMAT = (
"https://data.jsdelivr.com/v1/packages/gh/{owner}/{repo}@{branch}"
)
"""jsdelivr包地址格式"""
GIT_API_TREES_FORMAT = (
"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1"
)
"""git api trees地址格式"""
CACHED_API_TTL = 300
"""缓存api ttl"""
RAW_CONTENT_FORMAT = "https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}"
"""raw content格式"""
ARCHIVE_URL_FORMAT = "https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip"
"""archive url格式"""
RELEASE_ASSETS_FORMAT = (
"https://github.com/{owner}/{repo}/releases/download/{version}/{filename}"
)
"""release assets格式"""
RELEASE_SOURCE_FORMAT = (
"https://codeload.github.com/{owner}/{repo}/legacy.{compress}/refs/tags/{version}"
)
"""release 源码格式"""

View File

@ -0,0 +1,63 @@
from aiocache import cached
from ..http_utils import AsyncHttpx
from .consts import (
ARCHIVE_URL_FORMAT,
RAW_CONTENT_FORMAT,
RELEASE_ASSETS_FORMAT,
RELEASE_SOURCE_FORMAT,
)
async def __get_fastest_format(formats: dict[str, str]) -> str:
sorted_urls = await AsyncHttpx.get_fastest_mirror(list(formats.keys()))
if not sorted_urls:
raise Exception("无法获取任意GitHub资源加速地址请检查网络")
return formats[sorted_urls[0]]
@cached()
async def get_fastest_raw_format() -> str:
"""获取最快的raw下载地址格式"""
formats: dict[str, str] = {
"https://raw.githubusercontent.com/": RAW_CONTENT_FORMAT,
"https://ghproxy.cc/": f"https://ghproxy.cc/{RAW_CONTENT_FORMAT}",
"https://mirror.ghproxy.com/": f"https://mirror.ghproxy.com/{RAW_CONTENT_FORMAT}",
"https://gh-proxy.com/": f"https://gh-proxy.com/{RAW_CONTENT_FORMAT}",
"https://cdn.jsdelivr.net/": "https://cdn.jsdelivr.net/gh/{owner}/{repo}@{branch}/{path}",
}
return await __get_fastest_format(formats)
@cached()
async def get_fastest_archive_format() -> str:
"""获取最快的归档下载地址格式"""
formats: dict[str, str] = {
"https://github.com/": ARCHIVE_URL_FORMAT,
"https://ghproxy.cc/": f"https://ghproxy.cc/{ARCHIVE_URL_FORMAT}",
"https://mirror.ghproxy.com/": f"https://mirror.ghproxy.com/{ARCHIVE_URL_FORMAT}",
"https://gh-proxy.com/": f"https://gh-proxy.com/{ARCHIVE_URL_FORMAT}",
}
return await __get_fastest_format(formats)
@cached()
async def get_fastest_release_format() -> str:
"""获取最快的发行版资源下载地址格式"""
formats: dict[str, str] = {
"https://objects.githubusercontent.com/": RELEASE_ASSETS_FORMAT,
"https://ghproxy.cc/": f"https://ghproxy.cc/{RELEASE_ASSETS_FORMAT}",
"https://mirror.ghproxy.com/": f"https://mirror.ghproxy.com/{RELEASE_ASSETS_FORMAT}",
"https://gh-proxy.com/": f"https://gh-proxy.com/{RELEASE_ASSETS_FORMAT}",
}
return await __get_fastest_format(formats)
@cached()
async def get_fastest_release_source_format() -> str:
"""获取最快的发行版源码下载地址格式"""
formats: dict[str, str] = {
"https://codeload.github.com/": RELEASE_SOURCE_FORMAT,
"https://p.102333.xyz/": f"https://p.102333.xyz/{RELEASE_SOURCE_FORMAT}",
}
return await __get_fastest_format(formats)

View File

@ -0,0 +1,212 @@
from typing import Protocol
from aiocache import cached
from strenum import StrEnum
from pydantic import BaseModel
from ..http_utils import AsyncHttpx
from .consts import CACHED_API_TTL, GIT_API_TREES_FORMAT, JSD_PACKAGE_API_FORMAT
from .func import (
get_fastest_raw_format,
get_fastest_archive_format,
get_fastest_release_source_format,
)
class RepoInfo(BaseModel):
"""仓库信息"""
owner: str
repo: str
branch: str = "main"
async def get_raw_download_url(self, path: str):
url_format = await get_fastest_raw_format()
return url_format.format(**self.dict(), path=path)
async def get_archive_download_url(self):
url_format = await get_fastest_archive_format()
return url_format.format(**self.dict())
async def get_release_source_download_url_tgz(self, version: str):
url_format = await get_fastest_release_source_format()
return url_format.format(**self.dict(), version=version, compress="tar.gz")
async def get_release_source_download_url_zip(self, version: str):
url_format = await get_fastest_release_source_format()
return url_format.format(**self.dict(), version=version, compress="zip")
class APIStrategy(Protocol):
"""API策略"""
body: BaseModel
async def parse_repo_info(self, repo_info: RepoInfo) -> BaseModel: ...
def get_files(self, module_path: str, is_dir: bool) -> list[str]: ...
class RepoAPI:
"""基础接口"""
def __init__(self, strategy: APIStrategy):
self.strategy = strategy
async def parse_repo_info(self, repo_info: RepoInfo):
body = await self.strategy.parse_repo_info(repo_info)
self.strategy.body = body
def get_files(self, module_path: str, is_dir: bool) -> list[str]:
return self.strategy.get_files(module_path, is_dir)
class FileType(StrEnum):
"""文件类型"""
FILE = "file"
DIR = "directory"
PACKAGE = "gh"
class FileInfo(BaseModel):
"""文件信息"""
type: FileType
name: str
files: list["FileInfo"] = []
class JsdelivrStrategy:
"""Jsdelivr策略"""
body: FileInfo
def get_file_paths(self, module_path: str, is_dir: bool = True) -> list[str]:
"""获取文件路径"""
paths = module_path.split("/")
filename = "" if is_dir else paths[-1]
paths = paths if is_dir else paths[:-1]
cur_file = self.body
for path in paths: # 导航到正确的目录
cur_file = next(
(
f
for f in cur_file.files
if f.type == FileType.DIR and f.name == path
),
None,
)
if not cur_file:
raise ValueError(f"模块路径{module_path}不存在")
def collect_files(file: FileInfo, current_path: str, filename: str):
"""收集文件"""
if file.type == FileType.FILE and (not filename or file.name == filename):
return [f"{current_path}/{file.name}"]
elif file.type == FileType.DIR and file.files:
return [
path
for f in file.files
for path in collect_files(
f,
(
f"{current_path}/{f.name}"
if f.type == FileType.DIR
else current_path
),
filename,
)
]
return []
return collect_files(cur_file, "/".join(paths), filename)
@classmethod
@cached(ttl=CACHED_API_TTL)
async def parse_repo_info(cls, repo_info: RepoInfo) -> "FileInfo":
"""解析仓库信息"""
"""获取插件包信息
参数:
repo_info: 仓库信息
返回:
FileInfo: 插件包信息
"""
jsd_package_url: str = JSD_PACKAGE_API_FORMAT.format(
owner=repo_info.owner, repo=repo_info.repo, branch=repo_info.branch
)
res = await AsyncHttpx.get(url=jsd_package_url)
if res.status_code != 200:
raise ValueError(f"下载错误, code: {res.status_code}")
return FileInfo(**res.json())
def get_files(self, module_path: str, is_dir: bool = True) -> list[str]:
"""获取文件路径"""
return self.get_file_paths(module_path, is_dir)
class TreeType(StrEnum):
"""树类型"""
FILE = "blob"
DIR = "tree"
class Tree(BaseModel):
""""""
path: str
mode: str
type: TreeType
sha: str
size: int | None
url: str
class TreeInfo(BaseModel):
"""树信息"""
sha: str
url: str
tree: list[Tree]
class GitHubStrategy:
"""GitHub策略"""
body: TreeInfo
def export_files(self, module_path: str) -> list[str]:
"""导出文件路径"""
tree_info = self.body
return [
file.path
for file in tree_info.tree
if file.type == TreeType.FILE and file.path.startswith(module_path)
]
@classmethod
@cached(ttl=CACHED_API_TTL)
async def parse_repo_info(cls, repo_info: RepoInfo) -> "TreeInfo":
"""获取仓库树
参数:
repo_info: 仓库信息
返回:
TreesInfo: 仓库树信息
"""
git_tree_url: str = GIT_API_TREES_FORMAT.format(
owner=repo_info.owner, repo=repo_info.repo, branch=repo_info.branch
)
res = await AsyncHttpx.get(url=git_tree_url)
if res.status_code != 200:
raise ValueError(f"下载错误, code: {res.status_code}")
return TreeInfo(**res.json())
def get_files(self, module_path: str, is_dir: bool = True) -> list[str]:
"""获取文件路径"""
return self.export_files(module_path)