Merge branch 'main' into main

This commit is contained in:
HibiKier 2025-06-21 16:15:23 +08:00 committed by GitHub
commit 3ade3756dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 572 additions and 190 deletions

58
.github/workflows/publish-docker.yml vendored Normal file
View File

@ -0,0 +1,58 @@
#
name: Create and publish a Docker image
# Configures this workflow to run on demand via workflow_dispatch.
on:
workflow_dispatch:
# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds.
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
# There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu.
jobs:
build-and-push-image:
runs-on: ubuntu-latest
# Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job.
permissions:
contents: read
packages: write
attestations: write
id-token: write
#
steps:
- name: Checkout repository
uses: actions/checkout@v4
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
# This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
# It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see [Usage](https://github.com/docker/build-push-action#usage) in the README of the `docker/build-push-action` repository.
# It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
- name: Build and push Docker image
id: push
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
# This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see [Using artifact attestations to establish provenance for builds](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds).
- name: Generate artifact attestation
uses: actions/attest-build-provenance@v2
with:
subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
subject-digest: ${{ steps.push.outputs.digest }}
push-to-registry: true

View File

@ -11,6 +11,8 @@
"displayname",
"flmt",
"getbbox",
"gitcode",
"GITEE",
"hibiapi",
"httpx",
"jsdelivr",

View File

@ -359,7 +359,7 @@ async def test_add_plugin_exist(
init_mocked_api(mocked_api=mocked_api)
mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins",
"zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins",
return_value=[("search_image", "0.1")],
)
plugin_id = 1

View File

@ -57,7 +57,7 @@ async def test_search_plugin_name(
)
ctx.receive_event(bot=bot, event=event)
mock_table_page.assert_awaited_once_with(
"插件列表",
"商店插件列表",
"通过添加/移除插件 ID 来管理插件",
["-", "ID", "名称", "简介", "作者", "版本", "类型"],
[
@ -123,7 +123,7 @@ async def test_search_plugin_author(
)
ctx.receive_event(bot=bot, event=event)
mock_table_page.assert_awaited_once_with(
"插件列表",
"商店插件列表",
"通过添加/移除插件 ID 来管理插件",
["-", "ID", "名称", "简介", "作者", "版本", "类型"],
[

View File

@ -32,7 +32,7 @@ async def test_update_all_plugin_basic_need_update(
new=tmp_path / "zhenxun",
)
mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins",
"zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins",
return_value=[("search_image", "0.0")],
)
@ -87,7 +87,7 @@ async def test_update_all_plugin_basic_is_new(
new=tmp_path / "zhenxun",
)
mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins",
"zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins",
return_value=[("search_image", "0.1")],
)

View File

@ -32,7 +32,7 @@ async def test_update_plugin_basic_need_update(
new=tmp_path / "zhenxun",
)
mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins",
"zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins",
return_value=[("search_image", "0.0")],
)
@ -87,7 +87,7 @@ async def test_update_plugin_basic_is_new(
new=tmp_path / "zhenxun",
)
mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins",
"zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins",
return_value=[("search_image", "0.1")],
)

View File

@ -1,5 +1,6 @@
{
"鸡汤": {
[
{
"name": "鸡汤",
"module": "jitang",
"module_path": "plugins.alapi.jitang",
"description": "喏,亲手为你煮的鸡汤",
@ -9,7 +10,8 @@
"plugin_type": "NORMAL",
"is_dir": false
},
"识图": {
{
"name": "识图",
"module": "search_image",
"module_path": "plugins.search_image",
"description": "以图搜图,看破本源",
@ -19,7 +21,8 @@
"plugin_type": "NORMAL",
"is_dir": true
},
"网易云热评": {
{
"name": "网易云热评",
"module": "comments_163",
"module_path": "plugins.alapi.comments_163",
"description": "生了个人,我很抱歉",
@ -29,7 +32,8 @@
"plugin_type": "NORMAL",
"is_dir": false
},
"B站订阅": {
{
"name": "B站订阅",
"module": "bilibili_sub",
"module_path": "plugins.bilibili_sub",
"description": "非常便利的B站订阅通知",
@ -39,4 +43,4 @@
"plugin_type": "NORMAL",
"is_dir": true
}
}
]

View File

@ -1,5 +1,6 @@
{
"github订阅": {
[
{
"name": "github订阅",
"module": "github_sub",
"module_path": "github_sub",
"description": "订阅github用户或仓库",
@ -10,7 +11,8 @@
"is_dir": true,
"github_url": "https://github.com/xuanerwa/zhenxun_github_sub"
},
"Minecraft查服": {
{
"name": "Minecraft查服",
"module": "mc_check",
"module_path": "mc_check",
"description": "Minecraft服务器状态查询支持IPv6",
@ -21,4 +23,4 @@
"is_dir": true,
"github_url": "https://github.com/molanp/zhenxun_check_Minecraft"
}
}
]

View File

@ -9,7 +9,7 @@ from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.utils import is_number
from .data_source import ShopManage
from .data_source import StoreManager
__plugin_meta__ = PluginMetadata(
name="插件商店",
@ -82,7 +82,7 @@ _matcher.shortcut(
@_matcher.assign("$main")
async def _(session: EventSession):
try:
result = await ShopManage.get_plugins_info()
result = await StoreManager.get_plugins_info()
logger.info("查看插件列表", "插件商店", session=session)
await MessageUtils.build_message(result).send()
except Exception as e:
@ -97,7 +97,7 @@ async def _(session: EventSession, plugin_id: str):
await MessageUtils.build_message(f"正在添加插件 Id: {plugin_id}").send()
else:
await MessageUtils.build_message(f"正在添加插件 Module: {plugin_id}").send()
result = await ShopManage.add_plugin(plugin_id)
result = await StoreManager.add_plugin(plugin_id)
except Exception as e:
logger.error(f"添加插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
await MessageUtils.build_message(
@ -110,7 +110,7 @@ async def _(session: EventSession, plugin_id: str):
@_matcher.assign("remove")
async def _(session: EventSession, plugin_id: str):
try:
result = await ShopManage.remove_plugin(plugin_id)
result = await StoreManager.remove_plugin(plugin_id)
except Exception as e:
logger.error(f"移除插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
await MessageUtils.build_message(
@ -123,7 +123,7 @@ async def _(session: EventSession, plugin_id: str):
@_matcher.assign("search")
async def _(session: EventSession, plugin_name_or_author: str):
try:
result = await ShopManage.search_plugin(plugin_name_or_author)
result = await StoreManager.search_plugin(plugin_name_or_author)
except Exception as e:
logger.error(
f"搜索插件 name: {plugin_name_or_author}失败",
@ -145,7 +145,7 @@ async def _(session: EventSession, plugin_id: str):
await MessageUtils.build_message(f"正在更新插件 Id: {plugin_id}").send()
else:
await MessageUtils.build_message(f"正在更新插件 Module: {plugin_id}").send()
result = await ShopManage.update_plugin(plugin_id)
result = await StoreManager.update_plugin(plugin_id)
except Exception as e:
logger.error(f"更新插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
await MessageUtils.build_message(
@ -159,7 +159,7 @@ async def _(session: EventSession, plugin_id: str):
async def _(session: EventSession):
try:
await MessageUtils.build_message("正在更新全部插件").send()
result = await ShopManage.update_all_plugin()
result = await StoreManager.update_all_plugin()
except Exception as e:
logger.error("更新全部插件失败", "插件商店", session=session, e=e)
await MessageUtils.build_message(f"更新全部插件失败 e: {e}").finish()

View File

@ -9,3 +9,5 @@ DEFAULT_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins/tree/ma
EXTRA_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins_index/tree/index"
"""插件库索引github仓库地址"""
LOG_COMMAND = "插件商店"

View File

@ -1,6 +1,5 @@
from pathlib import Path
import shutil
import subprocess
from aiocache import cached
import ujson as json
@ -14,9 +13,15 @@ from zhenxun.utils.github_utils import GithubUtils
from zhenxun.utils.github_utils.models import RepoAPI
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
from zhenxun.utils.utils import is_number
from .config import BASE_PATH, DEFAULT_GITHUB_URL, EXTRA_GITHUB_URL
from .config import (
BASE_PATH,
DEFAULT_GITHUB_URL,
EXTRA_GITHUB_URL,
LOG_COMMAND,
)
BAT_FILE = Path() / "win启动.bat"
@ -45,74 +50,69 @@ def install_requirement(plugin_path: Path):
requirement_files = ["requirement.txt", "requirements.txt"]
requirement_paths = [plugin_path / file for file in requirement_files]
existing_requirements = next(
if existing_requirements := next(
(path for path in requirement_paths if path.exists()), None
)
if not existing_requirements:
logger.debug(
f"No requirement.txt found for plugin: {plugin_path.name}", "插件管理"
)
return
try:
command = WIN_COMMAND if BAT_FILE.exists() else DEFAULT_COMMAND
command.append(str(existing_requirements))
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
"Successfully installed dependencies for"
f" plugin: {plugin_path.name}. Output:\n{result.stdout}",
"插件管理",
)
except subprocess.CalledProcessError:
logger.error(
f"Failed to install dependencies for plugin: {plugin_path.name}. "
" Error:\n{e.stderr}"
)
):
VirtualEnvPackageManager.install_requirement(existing_requirements)
class ShopManage:
class StoreManager:
@classmethod
@cached(60)
async def get_data(cls) -> dict[str, StorePluginInfo]:
"""获取插件信息数据
异常:
ValueError: 访问请求失败
async def get_github_plugins(cls) -> list[StorePluginInfo]:
"""获取github插件列表信息
返回:
dict: 插件信息数据
list[StorePluginInfo]: 插件列表数据
"""
default_github_repo = GithubUtils.parse_github_url(DEFAULT_GITHUB_URL)
extra_github_repo = GithubUtils.parse_github_url(EXTRA_GITHUB_URL)
for repo_info in [default_github_repo, extra_github_repo]:
if await repo_info.update_repo_commit():
logger.info(f"获取最新提交: {repo_info.branch}", "插件管理")
else:
logger.warning(f"获取最新提交失败: {repo_info}", "插件管理")
default_github_url = await default_github_repo.get_raw_download_urls(
"plugins.json"
)
extra_github_url = await extra_github_repo.get_raw_download_urls("plugins.json")
res = await AsyncHttpx.get(default_github_url, check_status_code=200)
res2 = await AsyncHttpx.get(extra_github_url, check_status_code=200)
repo_info = GithubUtils.parse_github_url(DEFAULT_GITHUB_URL)
if await repo_info.update_repo_commit():
logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND)
else:
logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND)
default_github_url = await repo_info.get_raw_download_urls("plugins.json")
response = await AsyncHttpx.get(default_github_url, check_status_code=200)
if response.status_code == 200:
logger.info("获取github插件列表成功", LOG_COMMAND)
return [StorePluginInfo(**detail) for detail in json.loads(response.text)]
else:
logger.warning(
f"获取github插件列表失败: {response.status_code}", LOG_COMMAND
)
return []
# 检查请求结果
if res.status_code != 200 or res2.status_code != 200:
raise ValueError(f"下载错误, code: {res.status_code}, {res2.status_code}")
@classmethod
async def get_extra_plugins(cls) -> list[StorePluginInfo]:
"""获取额外插件列表信息
# 解析并合并返回的 JSON 数据
data1 = json.loads(res.text)
data2 = json.loads(res2.text)
return {
name: StorePluginInfo(**detail)
for name, detail in {**data1, **data2}.items()
}
返回:
list[StorePluginInfo]: 插件列表数据
"""
repo_info = GithubUtils.parse_github_url(EXTRA_GITHUB_URL)
if await repo_info.update_repo_commit():
logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND)
else:
logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND)
extra_github_url = await repo_info.get_raw_download_urls("plugins.json")
response = await AsyncHttpx.get(extra_github_url, check_status_code=200)
if response.status_code == 200:
return [StorePluginInfo(**detail) for detail in json.loads(response.text)]
else:
logger.warning(
f"获取github扩展插件列表失败: {response.status_code}", LOG_COMMAND
)
return []
@classmethod
@cached(60)
async def get_data(cls) -> list[StorePluginInfo]:
"""获取插件信息数据
返回:
list[StorePluginInfo]: 插件信息数据
"""
plugins = await cls.get_github_plugins()
extra_plugins = await cls.get_extra_plugins()
return [*plugins, *extra_plugins]
@classmethod
def version_check(cls, plugin_info: StorePluginInfo, suc_plugin: dict[str, str]):
@ -120,7 +120,7 @@ class ShopManage:
参数:
plugin_info: StorePluginInfo
suc_plugin: dict[str, str]
suc_plugin: 模块名: 版本号
返回:
str: 版本号
@ -140,7 +140,7 @@ class ShopManage:
参数:
plugin_info: StorePluginInfo
suc_plugin: dict[str, str]
suc_plugin: 模块名: 版本号
返回:
bool: 是否有更新
@ -164,21 +164,21 @@ class ShopManage:
返回:
BuildImage | str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.get_data()
plugin_list: list[StorePluginInfo] = await cls.get_data()
column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"]
plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "0.1") for p in plugin_list}
db_plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "0.1") for p in db_plugin_list}
data_list = [
[
"已安装" if plugin_info[1].module in suc_plugin else "",
"已安装" if plugin_info.module in suc_plugin else "",
id,
plugin_info[0],
plugin_info[1].description,
plugin_info[1].author,
cls.version_check(plugin_info[1], suc_plugin),
plugin_info[1].plugin_type_name,
plugin_info.name,
plugin_info.description,
plugin_info.author,
cls.version_check(plugin_info, suc_plugin),
plugin_info.plugin_type_name,
]
for id, plugin_info in enumerate(data.items())
for id, plugin_info in enumerate(plugin_list)
]
return await ImageTemplate.table_page(
"插件列表",
@ -198,15 +198,15 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.get_data()
plugin_list: list[StorePluginInfo] = await cls.get_data()
try:
plugin_key = await cls._resolve_plugin_key(plugin_id)
except ValueError as e:
return str(e)
plugin_list = await cls.get_loaded_plugins("module")
plugin_info = data[plugin_key]
if plugin_info.module in [p[0] for p in plugin_list]:
return f"插件 {plugin_key} 已安装,无需重复安装"
db_plugin_list = await cls.get_loaded_plugins("module")
plugin_info = next(p for p in plugin_list if p.module == plugin_key)
if plugin_info.module in [p[0] for p in db_plugin_list]:
return f"插件 {plugin_info.name} 已安装,无需重复安装"
is_external = True
if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL
@ -215,34 +215,39 @@ class ShopManage:
if len(version_split) > 1:
github_url_split = plugin_info.github_url.split("/tree/")
plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}"
logger.info(f"正在安装插件 {plugin_key}...")
logger.info(f"正在安装插件 {plugin_info.name}...", LOG_COMMAND)
await cls.install_plugin_with_repo(
plugin_info.github_url,
plugin_info.module_path,
plugin_info.is_dir,
is_external,
)
return f"插件 {plugin_key} 安装成功! 重启后生效"
return f"插件 {plugin_info.name} 安装成功! 重启后生效"
@classmethod
async def install_plugin_with_repo(
cls, github_url: str, module_path: str, is_dir: bool, is_external: bool = False
cls,
github_url: str,
module_path: str,
is_dir: bool,
is_external: bool = False,
):
files: list[str]
repo_api: RepoAPI
repo_info = GithubUtils.parse_github_url(github_url)
if await repo_info.update_repo_commit():
logger.info(f"获取最新提交: {repo_info.branch}", "插件管理")
logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND)
else:
logger.warning(f"获取最新提交失败: {repo_info}", "插件管理")
logger.debug(f"成功获取仓库信息: {repo_info}", "插件管理")
logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND)
logger.debug(f"成功获取仓库信息: {repo_info}", LOG_COMMAND)
for repo_api in GithubUtils.iter_api_strategies():
try:
await repo_api.parse_repo_info(repo_info)
break
except Exception as e:
logger.warning(
f"获取插件文件失败: {e} | API类型: {repo_api.strategy}", "插件管理"
f"获取插件文件失败 | API类型: {repo_api.strategy}",
LOG_COMMAND,
e=e,
)
continue
else:
@ -258,7 +263,7 @@ class ShopManage:
base_path = BASE_PATH / "plugins" if is_external else BASE_PATH
base_path = base_path if module_path else base_path / repo_info.repo
download_paths: list[Path | str] = [base_path / file for file in files]
logger.debug(f"插件下载路径: {download_paths}", "插件管理")
logger.debug(f"插件下载路径: {download_paths}", LOG_COMMAND)
result = await AsyncHttpx.gather_download_file(download_urls, download_paths)
for _id, success in enumerate(result):
if not success:
@ -273,12 +278,12 @@ class ShopManage:
req_files.extend(
repo_api.get_files(f"{replace_module_path}/requirement.txt", False)
)
logger.debug(f"获取插件依赖文件列表: {req_files}", "插件管理")
logger.debug(f"获取插件依赖文件列表: {req_files}", LOG_COMMAND)
req_download_urls = [
await repo_info.get_raw_download_urls(file) for file in req_files
]
req_paths: list[Path | str] = [plugin_path / file for file in req_files]
logger.debug(f"插件依赖文件下载路径: {req_paths}", "插件管理")
logger.debug(f"插件依赖文件下载路径: {req_paths}", LOG_COMMAND)
if req_files:
result = await AsyncHttpx.gather_download_file(
req_download_urls, req_paths
@ -286,7 +291,7 @@ class ShopManage:
for success in result:
if not success:
raise Exception("插件依赖文件下载失败")
logger.debug(f"插件依赖文件列表: {req_paths}", "插件管理")
logger.debug(f"插件依赖文件列表: {req_paths}", LOG_COMMAND)
install_requirement(plugin_path)
except ValueError as e:
logger.warning("未获取到依赖文件路径...", e=e)
@ -303,12 +308,12 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.get_data()
plugin_list: list[StorePluginInfo] = await cls.get_data()
try:
plugin_key = await cls._resolve_plugin_key(plugin_id)
except ValueError as e:
return str(e)
plugin_info = data[plugin_key]
plugin_info = next(p for p in plugin_list if p.module == plugin_key)
path = BASE_PATH
if plugin_info.github_url:
path = BASE_PATH / "plugins"
@ -317,14 +322,14 @@ class ShopManage:
if not plugin_info.is_dir:
path = Path(f"{path}.py")
if not path.exists():
return f"插件 {plugin_key} 不存在..."
logger.debug(f"尝试移除插件 {plugin_key} 文件: {path}", "插件管理")
return f"插件 {plugin_info.name} 不存在..."
logger.debug(f"尝试移除插件 {plugin_info.name} 文件: {path}", LOG_COMMAND)
if plugin_info.is_dir:
shutil.rmtree(path)
else:
path.unlink()
await PluginInitManager.remove(f"zhenxun.{plugin_info.module_path}")
return f"插件 {plugin_key} 移除成功! 重启后生效"
return f"插件 {plugin_info.name} 移除成功! 重启后生效"
@classmethod
async def search_plugin(cls, plugin_name_or_author: str) -> BuildImage | str:
@ -336,25 +341,25 @@ class ShopManage:
返回:
BuildImage | str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.get_data()
plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list}
plugin_list: list[StorePluginInfo] = await cls.get_data()
db_plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in db_plugin_list}
filtered_data = [
(id, plugin_info)
for id, plugin_info in enumerate(data.items())
if plugin_name_or_author.lower() in plugin_info[0].lower()
or plugin_name_or_author.lower() in plugin_info[1].author.lower()
for id, plugin_info in enumerate(plugin_list)
if plugin_name_or_author.lower() in plugin_info.name.lower()
or plugin_name_or_author.lower() in plugin_info.author.lower()
]
data_list = [
[
"已安装" if plugin_info[1].module in suc_plugin else "",
"已安装" if plugin_info.module in suc_plugin else "",
id,
plugin_info[0],
plugin_info[1].description,
plugin_info[1].author,
cls.version_check(plugin_info[1], suc_plugin),
plugin_info[1].plugin_type_name,
plugin_info.name,
plugin_info.description,
plugin_info.author,
cls.version_check(plugin_info, suc_plugin),
plugin_info.plugin_type_name,
]
for id, plugin_info in filtered_data
]
@ -362,7 +367,7 @@ class ShopManage:
return "未找到相关插件..."
column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"]
return await ImageTemplate.table_page(
"插件列表",
"商店插件列表",
"通过添加/移除插件 ID 来管理插件",
column_name,
data_list,
@ -379,20 +384,20 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.get_data()
plugin_list: list[StorePluginInfo] = await cls.get_data()
try:
plugin_key = await cls._resolve_plugin_key(plugin_id)
except ValueError as e:
return str(e)
logger.info(f"尝试更新插件 {plugin_key}", "插件管理")
plugin_info = data[plugin_key]
plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list}
if plugin_info.module not in [p[0] for p in plugin_list]:
return f"插件 {plugin_key} 未安装,无法更新"
logger.debug(f"当前插件列表: {suc_plugin}", "插件管理")
plugin_info = next(p for p in plugin_list if p.module == plugin_key)
logger.info(f"尝试更新插件 {plugin_info.name}", LOG_COMMAND)
db_plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in db_plugin_list}
if plugin_info.module not in [p[0] for p in db_plugin_list]:
return f"插件 {plugin_info.name} 未安装,无法更新"
logger.debug(f"当前插件列表: {suc_plugin}", LOG_COMMAND)
if cls.check_version_is_new(plugin_info, suc_plugin):
return f"插件 {plugin_key} 已是最新版本"
return f"插件 {plugin_info.name} 已是最新版本"
is_external = True
if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL
@ -403,7 +408,7 @@ class ShopManage:
plugin_info.is_dir,
is_external,
)
return f"插件 {plugin_key} 更新成功! 重启后生效"
return f"插件 {plugin_info.name} 更新成功! 重启后生效"
@classmethod
async def update_all_plugin(cls) -> str:
@ -415,24 +420,33 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict[str, StorePluginInfo] = await cls.get_data()
plugin_list = list(data.keys())
plugin_list: list[StorePluginInfo] = await cls.get_data()
plugin_name_list = [p.name for p in plugin_list]
update_failed_list = []
update_success_list = []
result = "--已更新{}个插件 {}个失败 {}个成功--"
logger.info(f"尝试更新全部插件 {plugin_list}", "插件管理")
for plugin_key in plugin_list:
logger.info(f"尝试更新全部插件 {plugin_name_list}", LOG_COMMAND)
for plugin_info in plugin_list:
try:
plugin_info = data[plugin_key]
plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list}
if plugin_info.module not in [p[0] for p in plugin_list]:
logger.debug(f"插件 {plugin_key} 未安装,跳过", "插件管理")
db_plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in db_plugin_list}
if plugin_info.module not in [p[0] for p in db_plugin_list]:
logger.debug(
f"插件 {plugin_info.name}({plugin_info.module}) 未安装,跳过",
LOG_COMMAND,
)
continue
if cls.check_version_is_new(plugin_info, suc_plugin):
logger.debug(f"插件 {plugin_key} 已是最新版本,跳过", "插件管理")
logger.debug(
f"插件 {plugin_info.name}({plugin_info.module}) 已是最新版本"
",跳过",
LOG_COMMAND,
)
continue
logger.info(f"正在更新插件 {plugin_key}", "插件管理")
logger.info(
f"正在更新插件 {plugin_info.name}({plugin_info.module})",
LOG_COMMAND,
)
is_external = True
if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL
@ -443,10 +457,14 @@ class ShopManage:
plugin_info.is_dir,
is_external,
)
update_success_list.append(plugin_key)
update_success_list.append(plugin_info.name)
except Exception as e:
logger.error(f"更新插件 {plugin_key} 失败: {e}", "插件管理")
update_failed_list.append(plugin_key)
logger.error(
f"更新插件 {plugin_info.name}({plugin_info.module}) 失败",
LOG_COMMAND,
e=e,
)
update_failed_list.append(plugin_info.name)
if not update_success_list and not update_failed_list:
return "全部插件已是最新版本"
if update_success_list:
@ -468,13 +486,13 @@ class ShopManage:
@classmethod
async def _resolve_plugin_key(cls, plugin_id: str) -> str:
data: dict[str, StorePluginInfo] = await cls.get_data()
plugin_list: list[StorePluginInfo] = await cls.get_data()
if is_number(plugin_id):
idx = int(plugin_id)
if idx < 0 or idx >= len(data):
if idx < 0 or idx >= len(plugin_list):
raise ValueError("插件ID不存在...")
return list(data.keys())[idx]
return plugin_list[idx].module
elif isinstance(plugin_id, str):
if plugin_id not in [v.module for k, v in data.items()]:
if plugin_id not in [v.module for v in plugin_list]:
raise ValueError("插件Module不存在...")
return {v.module: k for k, v in data.items()}[plugin_id]
return plugin_id

View File

@ -1,3 +1,5 @@
from typing import Any, Literal
from nonebot.compat import model_dump
from pydantic import BaseModel
@ -13,9 +15,30 @@ type2name: dict[str, str] = {
}
class GiteeContents(BaseModel):
"""Gitee Api内容"""
type: Literal["file", "dir"]
"""类型"""
size: Any
"""文件大小"""
name: str
"""文件名"""
path: str
"""文件路径"""
url: str
"""文件链接"""
html_url: str
"""文件html链接"""
download_url: str
"""文件raw链接"""
class StorePluginInfo(BaseModel):
"""插件信息"""
name: str
"""插件名"""
module: str
"""模块名"""
module_path: str

View File

@ -1,6 +1,7 @@
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from nonebot import require
from nonebot.compat import model_dump
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.services.log import logger
@ -22,12 +23,12 @@ router = APIRouter(prefix="/store")
async def _() -> Result[dict]:
try:
require("plugin_store")
from zhenxun.builtin_plugins.plugin_store import ShopManage
from zhenxun.builtin_plugins.plugin_store import StoreManager
data = await ShopManage.get_data()
data = await StoreManager.get_data()
plugin_list = [
{**data[name].to_dict(), "name": name, "id": idx}
for idx, name in enumerate(data)
{**model_dump(plugin), "name": plugin.name, "id": idx}
for idx, plugin in enumerate(data)
]
modules = await PluginInfo.filter(load_status=True).values_list(
"module", flat=True
@ -48,9 +49,9 @@ async def _() -> Result[dict]:
async def _(param: PluginIr) -> Result:
try:
require("plugin_store")
from zhenxun.builtin_plugins.plugin_store import ShopManage
from zhenxun.builtin_plugins.plugin_store import StoreManager
result = await ShopManage.add_plugin(param.id) # type: ignore
result = await StoreManager.add_plugin(param.id) # type: ignore
return Result.ok(info=result)
except Exception as e:
return Result.fail(f"安装插件失败: {type(e)}: {e}")
@ -66,9 +67,9 @@ async def _(param: PluginIr) -> Result:
async def _(param: PluginIr) -> Result:
try:
require("plugin_store")
from zhenxun.builtin_plugins.plugin_store import ShopManage
from zhenxun.builtin_plugins.plugin_store import StoreManager
result = await ShopManage.update_plugin(param.id) # type: ignore
result = await StoreManager.update_plugin(param.id) # type: ignore
return Result.ok(info=result)
except Exception as e:
return Result.fail(f"更新插件失败: {type(e)}: {e}")
@ -84,9 +85,9 @@ async def _(param: PluginIr) -> Result:
async def _(param: PluginIr) -> Result:
try:
require("plugin_store")
from zhenxun.builtin_plugins.plugin_store import ShopManage
from zhenxun.builtin_plugins.plugin_store import StoreManager
result = await ShopManage.remove_plugin(param.id) # type: ignore
result = await StoreManager.remove_plugin(param.id) # type: ignore
return Result.ok(info=result)
except Exception as e:
return Result.fail(f"移除插件失败: {type(e)}: {e}")

View File

@ -287,8 +287,10 @@ class ConfigsManager:
try:
return config.arg_parser(value_to_process)
except Exception as e:
logger.warning(
f"arg_parser 执行失败 (key: {key}),将尝试其他方法。", e=e
logger.debug(
f"配置项类型转换 MODULE: [<u><y>{module}</y></u>]"
f" | KEY: [<u><y>{key}</y></u>] 将使用原始值",
e=e,
)
if config.type:

View File

@ -21,6 +21,9 @@ CACHED_API_TTL = 300
RAW_CONTENT_FORMAT = "https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}"
"""raw content格式"""
GITEE_RAW_CONTENT_FORMAT = "https://gitee.com/{owner}/{repo}/raw/main/{path}"
"""gitee raw content格式"""
ARCHIVE_URL_FORMAT = "https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip"
"""archive url格式"""

View File

@ -4,6 +4,7 @@ from zhenxun.utils.http_utils import AsyncHttpx
from .const import (
ARCHIVE_URL_FORMAT,
GITEE_RAW_CONTENT_FORMAT,
RAW_CONTENT_FORMAT,
RELEASE_ASSETS_FORMAT,
RELEASE_SOURCE_FORMAT,
@ -21,6 +22,7 @@ async def __get_fastest_formats(formats: dict[str, str]) -> list[str]:
async def get_fastest_raw_formats() -> list[str]:
"""获取最快的raw下载地址格式"""
formats: dict[str, str] = {
"https://gitee.com/": GITEE_RAW_CONTENT_FORMAT,
"https://raw.githubusercontent.com/": RAW_CONTENT_FORMAT,
"https://ghproxy.cc/": f"https://ghproxy.cc/{RAW_CONTENT_FORMAT}",
"https://gh-proxy.com/": f"https://gh-proxy.com/{RAW_CONTENT_FORMAT}",

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,36 @@
from abc import ABC
from typing import Literal
from pydantic import BaseModel
class Style(BaseModel):
"""常用样式"""
padding: str = "0px"
margin: str = "0px"
border: str = "0px"
border_radius: str = "0px"
text_align: Literal["left", "right", "center"] = "left"
color: str = "#000"
font_size: str = "16px"
class Component(ABC):
def __init__(self, background_color: str = "#fff", is_container: bool = False):
self.extra_style = []
self.style = Style()
self.background_color = background_color
self.is_container = is_container
self.children = []
def add_child(self, child: "Component | str"):
self.children.append(child)
def set_style(self, style: Style):
self.style = style
def add_style(self, style: str):
self.extra_style.append(style)
def to_html(self) -> str: ...

View File

@ -0,0 +1,15 @@
from ..component import Component, Style
from ..container import Row
class Title(Component):
def __init__(self, text: str, color: str = "#000"):
self.text = text
self.color = color
def build(self):
row = Row()
style = Style(font_size="36px", color=self.color)
row.set_style(style)
# def

View File

@ -0,0 +1,31 @@
from .component import Component
class Row(Component):
def __init__(self, background_color: str = "#fff"):
super().__init__(background_color, True)
class Col(Component):
def __init__(self, background_color: str = "#fff"):
super().__init__(background_color, True)
class Container(Component):
def __init__(self, background_color: str = "#fff"):
super().__init__(background_color, True)
self.children = []
class GlobalOverview:
def __init__(self, name: str):
self.name = name
self.class_name: dict[str, list[str]] = {}
self.content = None
def set_content(self, content: Container):
self.content = content
def add_class(self, class_name: str, contents: list[str]):
"""全局样式"""
self.class_name[class_name] = contents

View File

@ -7,10 +7,9 @@ from typing import Any, ClassVar, Literal, cast
import aiofiles
import httpx
from httpx import AsyncHTTPTransport, HTTPStatusError, Response
from httpx import AsyncHTTPTransport, HTTPStatusError, Proxy, Response
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_htmlrender import get_browser
from packaging.version import parse as parse_version
from playwright.async_api import Page
from rich.progress import (
BarColumn,
@ -25,23 +24,41 @@ from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.user_agent import get_user_agent
CLIENT_KEY = ["use_proxy", "proxy", "verify", "headers"]
CLIENT_KEY = ["use_proxy", "proxies", "proxy", "verify", "headers"]
def get_async_client(
proxies: dict[str, str] | None = None, verify: bool = False, **kwargs
proxies: dict[str, str] | None = None,
proxy: str | None = None,
verify: bool = False,
**kwargs,
) -> httpx.AsyncClient:
check_httpx_version = parse_version(httpx.__version__) >= parse_version("0.28.0")
transport = kwargs.pop("transport", None) or AsyncHTTPTransport(verify=verify)
if not check_httpx_version:
return httpx.AsyncClient(proxies=proxies, transport=transport, **kwargs) # type: ignore
proxy_str = None
if proxies:
proxy_str = proxies.get("http://") or proxies.get("https://")
if not proxy_str:
logger.warning(f"代理字典 {proxies} 中未能提取出有效的URL代理已被忽略。")
return httpx.AsyncClient(proxy=proxy_str, transport=transport, **kwargs) # type: ignore
http_proxy = proxies.get("http://")
https_proxy = proxies.get("https://")
return httpx.AsyncClient(
mounts={
"http://": AsyncHTTPTransport(
proxy=Proxy(http_proxy) if http_proxy else None
),
"https://": AsyncHTTPTransport(
proxy=Proxy(https_proxy) if https_proxy else None
),
},
transport=transport,
**kwargs,
)
elif proxy:
return httpx.AsyncClient(
mounts={
"http://": AsyncHTTPTransport(proxy=Proxy(proxy)),
"https://": AsyncHTTPTransport(proxy=Proxy(proxy)),
},
transport=transport,
**kwargs,
)
return httpx.AsyncClient(transport=transport, **kwargs)
class AsyncHttpx:
@ -60,7 +77,8 @@ class AsyncHttpx:
cls,
*,
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
proxies: dict[str, str] | None = None,
proxy: str | None = None,
headers: dict[str, str] | None = None,
verify: bool = False,
**kwargs,
@ -72,7 +90,8 @@ class AsyncHttpx:
参数:
use_proxy: 是否使用在类中定义的默认代理
proxy: 手动指定的代理会覆盖默认代理
proxies: 手动指定的代理会覆盖默认代理
proxy: 单个代理,用于兼容旧版本不再使用
headers: 需要合并到客户端的自定义请求头
verify: 是否验证 SSL 证书
**kwargs: 其他所有传递给 httpx.AsyncClient 的参数
@ -80,14 +99,18 @@ class AsyncHttpx:
返回:
AsyncGenerator[httpx.AsyncClient, None]: 生成器
"""
proxies_to_use = proxy or (cls.default_proxy if use_proxy else None)
proxies_to_use = proxies or (cls.default_proxy if use_proxy else None)
final_headers = get_user_agent()
if headers:
final_headers.update(headers)
async with get_async_client(
proxies=proxies_to_use, verify=verify, headers=final_headers, **kwargs
proxies=proxies_to_use,
proxy=proxy,
verify=verify,
headers=final_headers,
**kwargs,
) as client:
yield client

View File

@ -0,0 +1,159 @@
from pathlib import Path
import subprocess
from subprocess import CalledProcessError
from typing import ClassVar
from zhenxun.services.log import logger
BAT_FILE = Path() / "win启动.bat"
LOG_COMMAND = "VirtualEnvPackageManager"
class VirtualEnvPackageManager:
WIN_COMMAND: ClassVar[list[str]] = [
"./Python310/python.exe",
"-m",
"pip",
]
DEFAULT_COMMAND: ClassVar[list[str]] = ["poetry", "run", "pip"]
@classmethod
def __get_command(cls) -> list[str]:
return cls.WIN_COMMAND if BAT_FILE.exists() else cls.DEFAULT_COMMAND
@classmethod
def install(cls, package: list[str] | str):
"""安装依赖包
参数:
package: 安装依赖包名称或列表
"""
if isinstance(package, str):
package = [package]
try:
command = cls.__get_command()
command.append("install")
command.append(" ".join(package))
logger.info(f"执行虚拟环境安装包指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
f"安装虚拟环境包指令执行完成: {result.stdout}",
LOG_COMMAND,
)
except CalledProcessError as e:
logger.error(f"安装虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND)
@classmethod
def uninstall(cls, package: list[str] | str):
"""卸载依赖包
参数:
package: 卸载依赖包名称或列表
"""
if isinstance(package, str):
package = [package]
try:
command = cls.__get_command()
command.append("uninstall")
command.append(" ".join(package))
logger.info(f"执行虚拟环境卸载包指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
f"卸载虚拟环境包指令执行完成: {result.stdout}",
LOG_COMMAND,
)
except CalledProcessError as e:
logger.error(f"卸载虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND)
@classmethod
def update(cls, package: list[str] | str):
"""更新依赖包
参数:
package: 更新依赖包名称或列表
"""
if isinstance(package, str):
package = [package]
try:
command = cls.__get_command()
command.append("install")
command.append("--upgrade")
command.append(" ".join(package))
logger.info(f"执行虚拟环境更新包指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(f"更新虚拟环境包指令执行完成: {result.stdout}", LOG_COMMAND)
except CalledProcessError as e:
logger.error(f"更新虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND)
@classmethod
def install_requirement(cls, requirement_file: Path):
"""安装依赖文件
参数:
requirement_file: requirement文件路径
异常:
FileNotFoundError: 文件不存在
"""
if not requirement_file.exists():
raise FileNotFoundError(f"依赖文件 {requirement_file} 不存在", LOG_COMMAND)
try:
command = cls.__get_command()
command.append("install")
command.append("-r")
command.append(str(requirement_file.absolute()))
logger.info(f"执行虚拟环境安装依赖文件指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
f"安装虚拟环境依赖文件指令执行完成: {result.stdout}",
LOG_COMMAND,
)
except CalledProcessError as e:
logger.error(
f"安装虚拟环境依赖文件指令执行失败: {e.stderr}.",
LOG_COMMAND,
)
@classmethod
def list(cls) -> str:
"""列出已安装的依赖包"""
try:
command = cls.__get_command()
command.append("list")
logger.info(f"执行虚拟环境列出包指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
f"列出虚拟环境包指令执行完成: {result.stdout}",
LOG_COMMAND,
)
return result.stdout
except CalledProcessError as e:
logger.error(f"列出虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND)
return ""