Merge branch 'main' into feature/webui-config

This commit is contained in:
HibiKier 2025-05-15 23:59:00 +08:00 committed by GitHub
commit f6763270ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 3110 additions and 1111 deletions

View File

@ -16,6 +16,7 @@
"jsdelivr",
"kaiheila",
"lolicon",
"Mahiro",
"nonebot",
"onebot",
"pixiv",
@ -24,9 +25,9 @@
"tobytes",
"ujson",
"unban",
"Uninfo",
"userinfo",
"zhenxun",
"jsdelivr"
"zhenxun"
],
"python.analysis.autoImportCompletions": true,
"python.testing.pytestArgs": ["tests"],

View File

@ -44,7 +44,7 @@
<div align=center>
[文档](https://hibikier.github.io/zhenxun_bot/)
[文档](https://zhenxun-org.github.io/zhenxun_bot/)
</div>
@ -124,7 +124,7 @@ AccessToken: PUBLIC_ZHENXUN_TEST
- 通过 Config 配置项将所有插件配置统计保存至 config.yaml利于统一用户修改
- 方便增删插件,原生 nonebot2 matcher不需要额外修改仅仅通过简单的配置属性就可以生成`帮助图片`和`帮助信息`
- 提供了 cd阻塞每日次数等限制仅仅通过简单的属性就可以生成一个限制例如`PluginCdBlock` 等
- **更多详细请通过 [传送门](https://hibikier.github.io/zhenxun_bot/) 查看文档!**
- **更多详细请通过 [传送门](https://zhenxun-org.github.io/zhenxun_bot/) 查看文档!**
## 🐣 小白整合

67
docker-compose-dev.yml Normal file
View File

@ -0,0 +1,67 @@
services:
db:
image: postgres:15
ports:
- "5432:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
POSTGRES_DB: zhenxun
volumes:
- pgdata:/var/lib/postgresql/data
labels:
- "prometheus.io/scrape=true"
- "prometheus.io/port=9187"
postgres-exporter:
image: prometheuscommunity/postgres-exporter
environment:
DATA_SOURCE_NAME: "postgresql://postgres:password@db:5432/zhenxun?sslmode=disable"
ports:
- "9187:9187"
depends_on:
- db
redis:
image: redis:7
ports:
- "6379:6379"
labels:
- "prometheus.io/scrape=true"
- "prometheus.io/port=9121"
redis-exporter:
image: oliver006/redis_exporter
environment:
REDIS_ADDR: redis://redis:6379
ports:
- "9121:9121"
depends_on:
- redis
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
grafana:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
depends_on:
- prometheus
volumes:
pgdata:
prometheus_data:
grafana_data:

1696
poetry.lock generated

File diff suppressed because it is too large Load Diff

12
prometheus.yml Normal file
View File

@ -0,0 +1,12 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'postgresql'
static_configs:
- targets: [ 'postgres-exporter:9187' ]
- job_name: 'redis'
static_configs:
- targets: [ 'redis-exporter:9121' ]

View File

@ -45,6 +45,7 @@ nonebot-plugin-alconna = "^0.54.0"
tenacity = "^9.0.0"
nonebot-plugin-uninfo = ">0.4.1"
nonebot-plugin-waiter = "^0.8.1"
multidict = ">=6.0.0,!=6.3.2"
[tool.poetry.group.dev.dependencies]
nonebug = "^0.4"

View File

@ -7,7 +7,7 @@ apscheduler==3.11.0 ; python_version >= "3.10" and python_version < "4.0"
arclet-alconna-tools==0.7.10 ; python_version >= "3.10" and python_version < "4.0"
arclet-alconna==1.8.35 ; python_version >= "3.10" and python_version < "4.0"
arrow==1.3.0 ; python_version >= "3.10" and python_version < "4.0"
async-timeout==5.0.1 ; python_version >= "3.10" and python_version < "3.11.0"
async-timeout==5.0.1 ; python_version == "3.10"
asyncpg==0.30.0 ; python_version >= "3.10" and python_version < "4.0"
attrs==25.1.0 ; python_version >= "3.10" and python_version < "4.0"
beautifulsoup4==4.13.3 ; python_version >= "3.10" and python_version < "4.0"
@ -21,7 +21,7 @@ chardet==5.2.0 ; python_version >= "3.10" and python_version < "4.0"
charset-normalizer==3.4.1 ; python_version >= "3.10" and python_version < "4.0"
click==8.1.8 ; python_version >= "3.10" and python_version < "4.0"
cn2an==0.5.23 ; python_version >= "3.10" and python_version < "4.0"
colorama==0.4.6 ; python_version >= "3.10" and python_version < "4.0" and sys_platform == "win32" or python_version >= "3.10" and python_version < "4.0" and platform_system == "Windows"
colorama==0.4.6 ; python_version >= "3.10" and python_version < "4.0" and (platform_system == "Windows" or sys_platform == "win32")
cookiecutter==2.6.0 ; python_version >= "3.10" and python_version < "4.0"
cryptography==44.0.1 ; python_version >= "3.10" and python_version < "4.0"
dateparser==1.2.1 ; python_version >= "3.10" and python_version < "4.0"
@ -60,6 +60,7 @@ nonebot-plugin-session==0.2.3 ; python_version >= "3.10" and python_version < "4
nonebot-plugin-uninfo==0.6.8 ; python_version >= "3.10" and python_version < "4.0"
nonebot-plugin-waiter==0.8.1 ; python_version >= "3.10" and python_version < "4.0"
nonebot2==2.4.1 ; python_version >= "3.10" and python_version < "4.0"
nonebot2[fastapi]==2.4.1 ; python_version >= "3.10" and python_version < "4.0"
noneprompt==0.1.9 ; python_version >= "3.10" and python_version < "4.0"
numpy==2.2.2 ; python_version >= "3.10" and python_version < "4.0"
pillow==10.4.0 ; python_version >= "3.10" and python_version < "4.0"
@ -81,10 +82,10 @@ pygments==2.19.1 ; python_version >= "3.10" and python_version < "4.0"
pygtrie==2.5.0 ; python_version >= "3.10" and python_version < "4.0"
pymdown-extensions==10.14.3 ; python_version >= "3.10" and python_version < "4.0"
pypika-tortoise==0.1.6 ; python_version >= "3.10" and python_version < "4.0"
pypinyin==0.51.0 ; python_version >= "3.10" and python_version < "4.0"
pypinyin==0.51.0 ; python_version >= "3.10" and python_version < "4"
python-dateutil==2.9.0.post0 ; python_version >= "3.10" and python_version < "4.0"
python-dotenv==1.0.1 ; python_version >= "3.10" and python_version < "4.0"
python-jose==3.3.0 ; python_version >= "3.10" and python_version < "4.0"
python-jose[cryptography]==3.3.0 ; python_version >= "3.10" and python_version < "4.0"
python-markdown-math==0.8 ; python_version >= "3.10" and python_version < "4.0"
python-multipart==0.0.9 ; python_version >= "3.10" and python_version < "4.0"
python-slugify==8.0.4 ; python_version >= "3.10" and python_version < "4.0"
@ -94,10 +95,10 @@ pyyaml==6.0.2 ; python_version >= "3.10" and python_version < "4.0"
regex==2024.11.6 ; python_version >= "3.10" and python_version < "4.0"
requests==2.32.3 ; python_version >= "3.10" and python_version < "4.0"
retrying==1.3.4 ; python_version >= "3.10" and python_version < "4.0"
rfc3986==1.5.0 ; python_version >= "3.10" and python_version < "4.0"
rfc3986[idna2008]==1.5.0 ; python_version >= "3.10" and python_version < "4.0"
rich==13.9.4 ; python_version >= "3.10" and python_version < "4.0"
rsa==4.9 ; python_version >= "3.10" and python_version < "4.0"
ruamel-yaml-clib==0.2.12 ; python_version >= "3.10" and python_version < "3.13" and platform_python_implementation == "CPython"
rsa==4.9 ; python_version >= "3.10" and python_version < "4"
ruamel-yaml-clib==0.2.12 ; platform_python_implementation == "CPython" and python_version < "3.13" and python_version >= "3.10"
ruamel-yaml==0.18.10 ; python_version >= "3.10" and python_version < "4.0"
scipy==1.15.1 ; python_version >= "3.10" and python_version < "4.0"
sgmllib3k==1.0.0 ; python_version >= "3.10" and python_version < "4.0"
@ -109,17 +110,17 @@ strenum==0.4.15 ; python_version >= "3.10" and python_version < "4.0"
tarina==0.6.8 ; python_version >= "3.10" and python_version < "4.0"
tenacity==9.0.0 ; python_version >= "3.10" and python_version < "4.0"
text-unidecode==1.3 ; python_version >= "3.10" and python_version < "4.0"
tomli==2.2.1 ; python_version >= "3.10" and python_version < "3.11"
tomli==2.2.1 ; python_version == "3.10"
tomlkit==0.13.2 ; python_version >= "3.10" and python_version < "4.0"
tortoise-orm==0.20.1 ; python_version >= "3.10" and python_version < "4.0"
tortoise-orm[asyncpg]==0.20.0 ; python_version >= "3.10" and python_version < "4.0"
types-python-dateutil==2.9.0.20241206 ; python_version >= "3.10" and python_version < "4.0"
typing-extensions==4.12.2 ; python_version >= "3.10" and python_version < "4.0"
tzdata==2025.1 ; python_version >= "3.10" and python_version < "4.0" and platform_system == "Windows"
tzlocal==5.2 ; python_version >= "3.10" and python_version < "4.0"
ujson==5.10.0 ; python_version >= "3.10" and python_version < "4.0"
urllib3==2.3.0 ; python_version >= "3.10" and python_version < "4.0"
uvicorn==0.34.0 ; python_version >= "3.10" and python_version < "4.0"
uvloop==0.21.0 ; python_version >= "3.10" and python_version < "4.0" and (sys_platform != "win32" and sys_platform != "cygwin") and platform_python_implementation != "PyPy"
uvicorn[standard]==0.34.0 ; python_version >= "3.10" and python_version < "4.0"
uvloop==0.21.0 ; sys_platform != "win32" and sys_platform != "cygwin" and platform_python_implementation != "PyPy" and python_version >= "3.10" and python_version < "4.0"
virtualenv==20.29.2 ; python_version >= "3.10" and python_version < "4.0"
watchfiles==0.24.0 ; python_version >= "3.10" and python_version < "4.0"
wcwidth==0.2.13 ; python_version >= "3.10" and python_version < "4.0"

View File

@ -26,6 +26,21 @@ __plugin_meta__ = PluginMetadata(
_matcher = on_alconna(Alconna("关于"), priority=5, block=True, rule=to_me())
QQ_INFO = """
绪山真寻Bot
版本{version}
简介基于Nonebot2开发支持多平台是一个非常可爱的Bot呀希望与大家要好好相处
""".strip()
INFO = """
绪山真寻Bot
版本{version}
简介基于Nonebot2开发支持多平台是一个非常可爱的Bot呀希望与大家要好好相处
项目地址https://github.com/zhenxun-org/zhenxun_bot
文档地址https://zhenxun-org.github.io/zhenxun_bot/
""".strip()
@_matcher.handle()
async def _(session: Uninfo, arparma: Arparma):
ver_file = Path() / "__version__"
@ -35,25 +50,11 @@ async def _(session: Uninfo, arparma: Arparma):
if text := await f.read():
version = text.split(":")[-1].strip()
if PlatformUtils.is_qbot(session):
info: list[str | Path] = [
f"""
绪山真寻Bot
版本{version}
简介基于Nonebot2开发支持多平台是一个非常可爱的Bot呀希望与大家要好好相处
""".strip()
]
result: list[str | Path] = [QQ_INFO.format(version=version)]
path = DATA_PATH / "about.png"
if path.exists():
info.append(path)
result.append(path)
await MessageUtils.build_message(result).send() # type: ignore
else:
info = [
f"""
绪山真寻Bot
版本{version}
简介基于Nonebot2开发支持多平台是一个非常可爱的Bot呀希望与大家要好好相处
项目地址https://github.com/HibiKier/zhenxun_bot
文档地址https://hibikier.github.io/zhenxun_bot/
""".strip()
]
await MessageUtils.build_message(info).send() # type: ignore
logger.info("查看关于", arparma.header_result, session=session)
await MessageUtils.build_message(INFO.format(version=version)).send()
logger.info("查看关于", arparma.header_result, session=session)

View File

@ -15,7 +15,8 @@ async def get_task() -> dict[str, str] | None:
return {
"name": "被动技能",
"description": "控制群组中的被动技能状态",
"usage": "通过 开启/关闭群被动 来控制群被<br>----------<br>"
"usage": "通过 开启/关闭群被动 来控制群被动 <br>"
+ " 示例:开启/关闭群被动早晚安 <br> ---------- <br> "
+ "<br>".join([task.name for task in task_list]),
}
return None

View File

@ -1,3 +1,5 @@
import os
from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
@ -14,9 +16,9 @@ GROUP_HELP_PATH = DATA_PATH / "group_help"
def delete_help_image(gid: str | None = None):
"""删除帮助图片"""
if gid:
file = GROUP_HELP_PATH / f"{gid}.png"
if file.exists():
file.unlink()
for file in os.listdir(GROUP_HELP_PATH):
if file.startswith(f"{gid}"):
os.remove(GROUP_HELP_PATH / file)
else:
if HELP_FILE.exists():
HELP_FILE.unlink()
@ -196,7 +198,7 @@ class PluginManage:
await PluginInfo.filter(plugin_type=PluginType.NORMAL).update(
default_status=status
)
return f'成功将所有功能进群默认状态修改为: {"开启" if status else "关闭"}'
return f"成功将所有功能进群默认状态修改为: {'开启' if status else '关闭'}"
if group_id:
if group := await GroupConsole.get_or_none(
group_id=group_id, channel_id__isnull=True
@ -213,12 +215,12 @@ class PluginManage:
module_list = [f"<{module}" for module in module_list]
group.block_plugin = ",".join(module_list) + "," # type: ignore
await group.save(update_fields=["block_plugin"])
return f'成功将此群组所有功能状态修改为: {"开启" if status else "关闭"}'
return f"成功将此群组所有功能状态修改为: {'开启' if status else '关闭'}"
return "获取群组失败..."
await PluginInfo.filter(plugin_type=PluginType.NORMAL).update(
status=status, block_type=None if status else BlockType.ALL
)
return f'成功将所有功能全局状态修改为: {"开启" if status else "关闭"}'
return f"成功将所有功能全局状态修改为: {'开启' if status else '关闭'}"
@classmethod
async def is_wake(cls, group_id: str) -> bool:

View File

@ -1,7 +1,6 @@
from nonebot import on_message
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import UniMsg
from nonebot_plugin_apscheduler import scheduler
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import Config
@ -39,45 +38,17 @@ def rule(message: UniMsg) -> bool:
chat_history = on_message(rule=rule, priority=1, block=False)
TEMP_LIST = []
@chat_history.handle()
async def _(message: UniMsg, session: EventSession):
# group_id = session.id3 or session.id2
group_id = session.id2
TEMP_LIST.append(
ChatHistory(
async def handle_message(message: UniMsg, session: EventSession):
"""处理消息存储"""
try:
await ChatHistory.create(
user_id=session.id1,
group_id=group_id,
group_id=session.id2,
text=str(message),
plain_text=message.extract_plain_text(),
bot_id=session.bot_id,
platform=session.platform,
)
)
@scheduler.scheduled_job(
"interval",
minutes=1,
)
async def _():
try:
message_list = TEMP_LIST.copy()
TEMP_LIST.clear()
if message_list:
await ChatHistory.bulk_create(message_list)
logger.debug(f"批量添加聊天记录 {len(message_list)}", "定时任务")
except Exception as e:
logger.error("定时批量添加聊天记录", "定时任务", e=e)
# @test.handle()
# async def _(event: MessageEvent):
# print(await ChatHistory.get_user_msg(event.user_id, "private"))
# print(await ChatHistory.get_user_msg_count(event.user_id, "private"))
# print(await ChatHistory.get_user_msg(event.user_id, "group"))
# print(await ChatHistory.get_user_msg_count(event.user_id, "group"))
# print(await ChatHistory.get_group_msg(event.group_id))
# print(await ChatHistory.get_group_msg_count(event.group_id))
logger.warning("存储聊天记录失败", "chat_history", e=e)

View File

@ -40,7 +40,9 @@ async def create_help_img(
match help_type:
case "html":
result = BuildImage.open(await build_html_image(group_id, is_detail))
result = BuildImage.open(
await build_html_image(session, group_id, is_detail)
)
case "zhenxun":
result = BuildImage.open(
await build_zhenxun_image(session, group_id, is_detail)

View File

@ -1,5 +1,8 @@
from collections.abc import Callable
from nonebot_plugin_uninfo import Uninfo
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.enum import PluginType
@ -27,13 +30,15 @@ async def sort_type() -> dict[str, list[PluginInfo]]:
async def classify_plugin(
group_id: str | None, is_detail: bool, handle: Callable
session: Uninfo, group_id: str | None, is_detail: bool, handle: Callable
) -> dict[str, list]:
"""对插件进行分类并判断状态
参数:
session: Uninfo对象
group_id: 群组id
is_detail: 是否详细帮助
handle: 回调方法
返回:
dict[str, list[Item]]: 分类插件数据
@ -41,9 +46,10 @@ async def classify_plugin(
sort_data = await sort_type()
classify: dict[str, list] = {}
group = await GroupConsole.get_or_none(group_id=group_id) if group_id else None
bot = await BotConsole.get_or_none(bot_id=session.self_id)
for menu, value in sort_data.items():
for plugin in value:
if not classify.get(menu):
classify[menu] = []
classify[menu].append(handle(plugin, group, is_detail))
classify[menu].append(handle(bot, plugin, group, is_detail))
return classify

View File

@ -2,9 +2,11 @@ import os
import random
from nonebot_plugin_htmlrender import template_to_pic
from nonebot_plugin_uninfo import Uninfo
from pydantic import BaseModel
from zhenxun.configs.path_config import TEMPLATE_PATH
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.enum import BlockType
@ -48,11 +50,12 @@ ICON2STR = {
def __handle_item(
plugin: PluginInfo, group: GroupConsole | None, is_detail: bool
bot: BotConsole, plugin: PluginInfo, group: GroupConsole | None, is_detail: bool
) -> Item:
"""构造Item
参数:
bot: BotConsole
plugin: PluginInfo
group: 群组
is_detail: 是否详细
@ -73,10 +76,13 @@ def __handle_item(
]:
sta = 2
if group:
if f"{plugin.module}:super," in group.block_plugin:
if f"{plugin.module}," in group.superuser_block_plugin:
sta = 2
if f"{plugin.module}," in group.block_plugin:
sta = 1
if bot:
if f"{plugin.module}," in bot.block_plugins:
sta = 2
return Item(plugin_name=plugin.name, sta=sta)
@ -119,14 +125,17 @@ def build_plugin_data(classify: dict[str, list[Item]]) -> list[dict[str, str]]:
return plugin_list
async def build_html_image(group_id: str | None, is_detail: bool) -> bytes:
async def build_html_image(
session: Uninfo, group_id: str | None, is_detail: bool
) -> bytes:
"""构造HTML帮助图片
参数:
session: Uninfo
group_id: 群号
is_detail: 是否详细帮助
"""
classify = await classify_plugin(group_id, is_detail, __handle_item)
classify = await classify_plugin(session, group_id, is_detail, __handle_item)
plugin_list = build_plugin_data(classify)
return await template_to_pic(
template_path=str((TEMPLATE_PATH / "menu").absolute()),

View File

@ -6,6 +6,7 @@ from pydantic import BaseModel
from zhenxun.configs.config import BotConfig
from zhenxun.configs.path_config import TEMPLATE_PATH
from zhenxun.configs.utils import PluginExtraData
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.enum import BlockType
@ -21,12 +22,19 @@ class Item(BaseModel):
"""插件命令"""
def __handle_item(plugin: PluginInfo, group: GroupConsole | None, is_detail: bool):
def __handle_item(
bot: BotConsole | None,
plugin: PluginInfo,
group: GroupConsole | None,
is_detail: bool,
):
"""构造Item
参数:
bot: BotConsole
plugin: PluginInfo
group: 群组
is_detail: 是否为详细
返回:
Item: Item
@ -40,6 +48,8 @@ def __handle_item(plugin: PluginInfo, group: GroupConsole | None, is_detail: boo
plugin.name = f"{plugin.name}(不可用)"
elif group and f"{plugin.module}," in group.block_plugin:
plugin.name = f"{plugin.name}(不可用)"
elif bot and f"{plugin.module}," in bot.block_plugins:
plugin.name = f"{plugin.name}(不可用)"
commands = []
nb_plugin = nonebot.get_plugin_by_module_name(plugin.module_path)
if is_detail and nb_plugin and nb_plugin.metadata and nb_plugin.metadata.extra:
@ -142,7 +152,7 @@ async def build_zhenxun_image(
group_id: 群号
is_detail: 是否详细帮助
"""
classify = await classify_plugin(group_id, is_detail, __handle_item)
classify = await classify_plugin(session, group_id, is_detail, __handle_item)
plugin_list = build_plugin_data(classify)
platform = PlatformUtils.get_platform(session)
bot_id = BotConfig.get_qbot_uid(session.self_id) or session.self_id

View File

@ -4,7 +4,7 @@ from datetime import datetime, timedelta
import inspect
import time
from types import MappingProxyType
from typing import Any, ClassVar, Literal
from typing import Any, Literal
from nonebot.adapters import Bot, Event
from nonebot.compat import model_dump
@ -65,15 +65,15 @@ class ShopParam(BaseModel):
"""道具单次使用数量"""
text: str
"""text"""
send_success_msg: ClassVar[bool] = True
send_success_msg: bool = True
"""是否发送使用成功信息"""
max_num_limit: ClassVar[int] = 1
max_num_limit: int = 1
"""单次使用最大次数"""
session: Uninfo | None = None
"""Uninfo"""
message: UniMsg
"""UniMessage"""
extra_data: ClassVar[dict[str, Any]] = {}
extra_data: dict[str, Any] = Field(default_factory=dict)
"""额外数据"""
class Config:
@ -384,10 +384,10 @@ class ShopManage:
cls.uuid2goods[uuid] = Goods(
model=create_model(
f"{uuid}_model",
send_success_msg=send_success_msg,
max_num_limit=max_num_limit,
__base__=ShopParam,
extra_data=kwargs,
send_success_msg=(bool, Field(default=send_success_msg)),
max_num_limit=(int, Field(default=max_num_limit)),
extra_data=(dict[str, Any], Field(default=kwargs)),
),
params=kwargs,
before_handle=before_handle,

View File

@ -1,32 +1,77 @@
from typing import Annotated
from nonebot import on_command
from nonebot.adapters import Bot
from nonebot.params import Command
from arclet.alconna import AllParam
from nepattern import UnionPattern
from nonebot.adapters import Bot, Event
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_alconna import Text as alcText
from nonebot_plugin_alconna import UniMsg
import nonebot_plugin_alconna as alc
from nonebot_plugin_alconna import (
Alconna,
Args,
on_alconna,
)
from nonebot_plugin_alconna.uniseg.segment import (
At,
AtAll,
Audio,
Button,
Emoji,
File,
Hyper,
Image,
Keyboard,
Reference,
Reply,
Text,
Video,
Voice,
)
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from ._data_source import BroadcastManage
from .broadcast_manager import BroadcastManager
from .message_processor import (
_extract_broadcast_content,
get_broadcast_target_groups,
send_broadcast_and_notify,
)
BROADCAST_SEND_DELAY_RANGE = (1, 3)
__plugin_meta__ = PluginMetadata(
name="广播",
description="昭告天下!",
usage="""
广播 [消息] [图片]
示例广播 你们好
广播 [消息内容]
- 直接发送消息到除当前群组外的所有群组
- 支持文本图片@表情视频等多种消息类型
- 示例广播 你们好
- 示例广播 [图片] 新活动开始啦
广播 + 引用消息
- 将引用的消息作为广播内容发送
- 支持引用普通消息或合并转发消息
- 示例(引用一条消息) 广播
广播撤回
- 撤回最近一次由您触发的广播消息
- 仅能撤回短时间内的消息
- 示例广播撤回
特性
- 在群组中使用广播时不会将消息发送到当前群组
- 在私聊中使用广播时会发送到所有群组
别名
- bc (广播的简写)
- recall (广播撤回的别名)
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
version="1.2",
plugin_type=PluginType.SUPERUSER,
configs=[
RegisterConfig(
@ -42,26 +87,106 @@ __plugin_meta__ = PluginMetadata(
).to_dict(),
)
_matcher = on_command(
"广播", priority=1, permission=SUPERUSER, block=True, rule=to_me()
AnySeg = (
UnionPattern(
[
Text,
Image,
At,
AtAll,
Audio,
Video,
File,
Emoji,
Reply,
Reference,
Hyper,
Button,
Keyboard,
Voice,
]
)
@ "AnySeg"
)
_matcher = on_alconna(
Alconna(
"广播",
Args["content?", AllParam],
),
aliases={"bc"},
priority=1,
permission=SUPERUSER,
block=True,
rule=to_me(),
use_origin=False,
)
_recall_matcher = on_alconna(
Alconna("广播撤回"),
aliases={"recall"},
priority=1,
permission=SUPERUSER,
block=True,
rule=to_me(),
)
@_matcher.handle()
async def _(
async def handle_broadcast(
bot: Bot,
event: Event,
session: EventSession,
message: UniMsg,
command: Annotated[tuple[str, ...], Command()],
arp: alc.Arparma,
):
for msg in message:
if isinstance(msg, alcText) and msg.text.strip().startswith(command[0]):
msg.text = msg.text.replace(command[0], "", 1).strip()
break
await MessageUtils.build_message("正在发送..请等一下哦!").send()
count, error_count = await BroadcastManage.send(bot, message, session)
result = f"成功广播 {count} 个群组"
if error_count:
result += f"\n广播失败 {error_count} 个群组"
await MessageUtils.build_message(f"发送广播完成!\n{result}").send(reply_to=True)
logger.info(f"发送广播信息: {message}", "广播", session=session)
broadcast_content_msg = await _extract_broadcast_content(bot, event, arp, session)
if not broadcast_content_msg:
return
target_groups, enabled_groups = await get_broadcast_target_groups(bot, session)
if not target_groups or not enabled_groups:
return
try:
await send_broadcast_and_notify(
bot, event, broadcast_content_msg, enabled_groups, target_groups, session
)
except Exception as e:
error_msg = "发送广播失败"
BroadcastManager.log_error(error_msg, e, session)
await MessageUtils.build_message(f"{error_msg}").send(reply_to=True)
@_recall_matcher.handle()
async def handle_broadcast_recall(
bot: Bot,
event: Event,
session: EventSession,
):
"""处理广播撤回命令"""
await MessageUtils.build_message("正在尝试撤回最近一次广播...").send()
try:
success_count, error_count = await BroadcastManager.recall_last_broadcast(
bot, session
)
user_id = str(event.get_user_id())
if success_count == 0 and error_count == 0:
await bot.send_private_msg(
user_id=user_id,
message="没有找到最近的广播消息记录,可能已经撤回或超过可撤回时间。",
)
else:
result = f"广播撤回完成!\n成功撤回 {success_count} 条消息"
if error_count:
result += f"\n撤回失败 {error_count} 条消息 (可能已过期或无权限)"
await bot.send_private_msg(user_id=user_id, message=result)
BroadcastManager.log_info(
f"广播撤回完成: 成功 {success_count}, 失败 {error_count}", session
)
except Exception as e:
error_msg = "撤回广播消息失败"
BroadcastManager.log_error(error_msg, e, session)
user_id = str(event.get_user_id())
await bot.send_private_msg(user_id=user_id, message=f"{error_msg}")

View File

@ -1,72 +0,0 @@
import asyncio
import random
from nonebot.adapters import Bot
import nonebot_plugin_alconna as alc
from nonebot_plugin_alconna import Image, UniMsg
from nonebot_plugin_session import EventSession
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
class BroadcastManage:
@classmethod
async def send(
cls, bot: Bot, message: UniMsg, session: EventSession
) -> tuple[int, int]:
"""发送广播消息
参数:
bot: Bot
message: 消息内容
session: Session
返回:
tuple[int, int]: 发送成功的群组数量, 发送失败的群组数量
"""
message_list = []
for msg in message:
if isinstance(msg, alc.Image) and msg.url:
message_list.append(Image(url=msg.url))
elif isinstance(msg, alc.Text):
message_list.append(msg.text)
group_list, _ = await PlatformUtils.get_group_list(bot)
if group_list:
error_count = 0
for group in group_list:
try:
if not await CommonUtils.task_is_block(
bot,
"broadcast", # group.channel_id
group.group_id,
):
target = PlatformUtils.get_target(
group_id=group.group_id, channel_id=group.channel_id
)
if target:
await MessageUtils.build_message(message_list).send(
target, bot
)
logger.debug(
"发送成功",
"广播",
session=session,
target=f"{group.group_id}:{group.channel_id}",
)
await asyncio.sleep(random.randint(1, 3))
else:
logger.warning("target为空", "广播", session=session)
except Exception as e:
error_count += 1
logger.error(
"发送失败",
"广播",
session=session,
target=f"{group.group_id}:{group.channel_id}",
e=e,
)
return len(group_list) - error_count, error_count
return 0, 0

View File

@ -0,0 +1,490 @@
import asyncio
import random
import traceback
from typing import ClassVar
from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import Bot as V11Bot
from nonebot.exception import ActionFailed
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_alconna.uniseg import Receipt, Reference
from nonebot_plugin_session import EventSession
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.platform import PlatformUtils
from .models import BroadcastDetailResult, BroadcastResult
from .utils import custom_nodes_to_v11_nodes, uni_message_to_v11_list_of_dicts
class BroadcastManager:
"""广播管理器"""
_last_broadcast_msg_ids: ClassVar[dict[str, int]] = {}
@staticmethod
def _get_session_info(session: EventSession | None) -> str:
"""获取会话信息字符串"""
if not session:
return ""
try:
platform = getattr(session, "platform", "unknown")
session_id = str(session)
return f"[{platform}:{session_id}]"
except Exception:
return "[session-info-error]"
@staticmethod
def log_error(
message: str, error: Exception, session: EventSession | None = None, **kwargs
):
"""记录错误日志"""
session_info = BroadcastManager._get_session_info(session)
error_type = type(error).__name__
stack_trace = traceback.format_exc()
error_details = f"\n类型: {error_type}\n信息: {error!s}\n堆栈: {stack_trace}"
logger.error(
f"{session_info} {message}{error_details}", "广播", e=error, **kwargs
)
@staticmethod
def log_warning(message: str, session: EventSession | None = None, **kwargs):
"""记录警告级别日志"""
session_info = BroadcastManager._get_session_info(session)
logger.warning(f"{session_info} {message}", "广播", **kwargs)
@staticmethod
def log_info(message: str, session: EventSession | None = None, **kwargs):
"""记录信息级别日志"""
session_info = BroadcastManager._get_session_info(session)
logger.info(f"{session_info} {message}", "广播", **kwargs)
@classmethod
def get_last_broadcast_msg_ids(cls) -> dict[str, int]:
"""获取最近广播消息ID"""
return cls._last_broadcast_msg_ids.copy()
@classmethod
def clear_last_broadcast_msg_ids(cls) -> None:
"""清空消息ID记录"""
cls._last_broadcast_msg_ids.clear()
@classmethod
async def get_all_groups(cls, bot: Bot) -> tuple[list[GroupConsole], str]:
"""获取群组列表"""
return await PlatformUtils.get_group_list(bot)
@classmethod
async def send(
cls, bot: Bot, message: UniMessage, session: EventSession
) -> BroadcastResult:
"""发送广播到所有群组"""
logger.debug(
f"开始广播(send - 广播到所有群组)Bot ID: {bot.self_id}",
"广播",
session=session,
)
logger.debug("清空上一次的广播消息ID记录", "广播", session=session)
cls.clear_last_broadcast_msg_ids()
all_groups, _ = await cls.get_all_groups(bot)
return await cls.send_to_specific_groups(bot, message, all_groups, session)
@classmethod
async def send_to_specific_groups(
cls,
bot: Bot,
message: UniMessage,
target_groups: list[GroupConsole],
session_info: EventSession | str | None = None,
) -> BroadcastResult:
"""发送广播到指定群组"""
log_session = session_info or bot.self_id
logger.debug(
f"开始广播,目标 {len(target_groups)} 个群组Bot ID: {bot.self_id}",
"广播",
session=log_session,
)
if not target_groups:
logger.debug("目标群组列表为空,广播结束", "广播", session=log_session)
return 0, 0
platform = PlatformUtils.get_platform(bot)
is_forward_broadcast = any(
isinstance(seg, Reference) and getattr(seg, "nodes", None)
for seg in message
)
if platform == "qq" and isinstance(bot, V11Bot) and is_forward_broadcast:
if (
len(message) == 1
and isinstance(message[0], Reference)
and getattr(message[0], "nodes", None)
):
nodes_list = getattr(message[0], "nodes", [])
v11_nodes = custom_nodes_to_v11_nodes(nodes_list)
node_count = len(v11_nodes)
logger.debug(
f"从 UniMessage<Reference> 构造转发节点数: {node_count}",
"广播",
session=log_session,
)
else:
logger.warning(
"广播消息包含合并转发段和其他段,将尝试打平成一个节点发送",
"广播",
session=log_session,
)
v11_content_list = uni_message_to_v11_list_of_dicts(message)
v11_nodes = (
[
{
"type": "node",
"data": {
"user_id": bot.self_id,
"nickname": "广播",
"content": v11_content_list,
},
}
]
if v11_content_list
else []
)
if not v11_nodes:
logger.warning(
"构造出的 V11 合并转发节点为空,无法发送",
"广播",
session=log_session,
)
return 0, len(target_groups)
success_count, error_count, skip_count = await cls._broadcast_forward(
bot, log_session, target_groups, v11_nodes
)
else:
if is_forward_broadcast:
logger.warning(
f"合并转发消息在适配器 ({platform}) 不支持,将作为普通消息发送",
"广播",
session=log_session,
)
success_count, error_count, skip_count = await cls._broadcast_normal(
bot, log_session, target_groups, message
)
total = len(target_groups)
stats = f"成功: {success_count}, 失败: {error_count}"
stats += f", 跳过: {skip_count}, 总计: {total}"
logger.debug(
f"广播统计 - {stats}",
"广播",
session=log_session,
)
msg_ids = cls.get_last_broadcast_msg_ids()
if msg_ids:
id_list_str = ", ".join([f"{k}:{v}" for k, v in msg_ids.items()])
logger.debug(
f"广播结束,记录了 {len(msg_ids)} 条消息ID: {id_list_str}",
"广播",
session=log_session,
)
else:
logger.warning(
"广播结束但没有记录任何消息ID",
"广播",
session=log_session,
)
return success_count, error_count
@classmethod
async def _extract_message_id_from_result(
cls,
result: dict | Receipt,
group_key: str,
session_info: EventSession | str,
msg_type: str = "普通",
) -> None:
"""提取消息ID并记录"""
if isinstance(result, dict) and "message_id" in result:
msg_id = result["message_id"]
try:
msg_id_int = int(msg_id)
cls._last_broadcast_msg_ids[group_key] = msg_id_int
logger.debug(
f"记录群 {group_key}{msg_type}消息ID: {msg_id_int}",
"广播",
session=session_info,
)
except (ValueError, TypeError):
logger.warning(
f"{msg_type}结果中的 message_id 不是有效整数: {msg_id}",
"广播",
session=session_info,
)
elif isinstance(result, Receipt) and result.msg_ids:
try:
first_id_info = result.msg_ids[0]
msg_id = None
if isinstance(first_id_info, dict) and "message_id" in first_id_info:
msg_id = first_id_info["message_id"]
logger.debug(
f"从 Receipt.msg_ids[0] 提取到 ID: {msg_id}",
"广播",
session=session_info,
)
elif isinstance(first_id_info, int | str):
msg_id = first_id_info
logger.debug(
f"从 Receipt.msg_ids[0] 提取到原始ID: {msg_id}",
"广播",
session=session_info,
)
if msg_id is not None:
try:
msg_id_int = int(msg_id)
cls._last_broadcast_msg_ids[group_key] = msg_id_int
logger.debug(
f"记录群 {group_key} 的消息ID: {msg_id_int}",
"广播",
session=session_info,
)
except (ValueError, TypeError):
logger.warning(
f"提取的ID ({msg_id}) 不是有效整数",
"广播",
session=session_info,
)
else:
info_str = str(first_id_info)
logger.warning(
f"无法从 Receipt.msg_ids[0] 提取ID: {info_str}",
"广播",
session=session_info,
)
except IndexError:
logger.warning("Receipt.msg_ids 为空", "广播", session=session_info)
except Exception as e_extract:
logger.error(
f"从 Receipt 提取 msg_id 时出错: {e_extract}",
"广播",
session=session_info,
e=e_extract,
)
else:
logger.warning(
f"发送成功但无法从结果获取消息 ID. 结果: {result}",
"广播",
session=session_info,
)
@classmethod
async def _check_group_availability(cls, bot: Bot, group: GroupConsole) -> bool:
"""检查群组是否可用"""
if not group.group_id:
return False
if await CommonUtils.task_is_block(bot, "broadcast", group.group_id):
return False
return True
@classmethod
async def _broadcast_forward(
cls,
bot: V11Bot,
session_info: EventSession | str,
group_list: list[GroupConsole],
v11_nodes: list[dict],
) -> BroadcastDetailResult:
"""发送合并转发"""
success_count = 0
error_count = 0
skip_count = 0
for _, group in enumerate(group_list):
group_key = group.group_id or group.channel_id
if not await cls._check_group_availability(bot, group):
skip_count += 1
continue
try:
result = await bot.send_group_forward_msg(
group_id=int(group.group_id), messages=v11_nodes
)
logger.debug(
f"合并转发消息发送结果: {result}, 类型: {type(result)}",
"广播",
session=session_info,
)
await cls._extract_message_id_from_result(
result, group_key, session_info, "合并转发"
)
success_count += 1
await asyncio.sleep(random.randint(1, 3))
except ActionFailed as af_e:
error_count += 1
logger.error(
f"发送失败(合并转发) to {group_key}: {af_e}",
"广播",
session=session_info,
e=af_e,
)
except Exception as e:
error_count += 1
logger.error(
f"发送失败(合并转发) to {group_key}: {e}",
"广播",
session=session_info,
e=e,
)
return success_count, error_count, skip_count
@classmethod
async def _broadcast_normal(
cls,
bot: Bot,
session_info: EventSession | str,
group_list: list[GroupConsole],
message: UniMessage,
) -> BroadcastDetailResult:
"""发送普通消息"""
success_count = 0
error_count = 0
skip_count = 0
for _, group in enumerate(group_list):
group_key = (
f"{group.group_id}:{group.channel_id}"
if group.channel_id
else str(group.group_id)
)
if not await cls._check_group_availability(bot, group):
skip_count += 1
continue
try:
target = PlatformUtils.get_target(
group_id=group.group_id, channel_id=group.channel_id
)
if target:
receipt: Receipt = await message.send(target, bot=bot)
logger.debug(
f"广播消息发送结果: {receipt}, 类型: {type(receipt)}",
"广播",
session=session_info,
)
await cls._extract_message_id_from_result(
receipt, group_key, session_info
)
success_count += 1
await asyncio.sleep(random.randint(1, 3))
else:
logger.warning(
"target为空", "广播", session=session_info, target=group_key
)
skip_count += 1
except Exception as e:
error_count += 1
logger.error(
f"发送失败(普通) to {group_key}: {e}",
"广播",
session=session_info,
e=e,
)
return success_count, error_count, skip_count
@classmethod
async def recall_last_broadcast(
cls, bot: Bot, session_info: EventSession | str
) -> BroadcastResult:
"""撤回最近广播"""
msg_ids_to_recall = cls.get_last_broadcast_msg_ids()
if not msg_ids_to_recall:
logger.warning(
"没有找到最近的广播消息ID记录", "广播撤回", session=session_info
)
return 0, 0
id_list_str = ", ".join([f"{k}:{v}" for k, v in msg_ids_to_recall.items()])
logger.debug(
f"找到 {len(msg_ids_to_recall)} 条广播消息ID记录: {id_list_str}",
"广播撤回",
session=session_info,
)
success_count = 0
error_count = 0
logger.info(
f"准备撤回 {len(msg_ids_to_recall)} 条广播消息",
"广播撤回",
session=session_info,
)
for group_key, msg_id in msg_ids_to_recall.items():
try:
logger.debug(
f"尝试撤回消息 (ID: {msg_id}) in {group_key}",
"广播撤回",
session=session_info,
)
await bot.call_api("delete_msg", message_id=msg_id)
success_count += 1
except ActionFailed as af_e:
retcode = getattr(af_e, "retcode", None)
wording = getattr(af_e, "wording", "")
if retcode == 100 and "MESSAGE_NOT_FOUND" in wording.upper():
logger.warning(
f"消息 (ID: {msg_id}) 可能已被撤回或不存在于 {group_key}",
"广播撤回",
session=session_info,
)
elif retcode == 300 and "delete message" in wording.lower():
logger.warning(
f"消息 (ID: {msg_id}) 可能已被撤回或不存在于 {group_key}",
"广播撤回",
session=session_info,
)
else:
error_count += 1
logger.error(
f"撤回消息失败 (ID: {msg_id}) in {group_key}: {af_e}",
"广播撤回",
session=session_info,
e=af_e,
)
except Exception as e:
error_count += 1
logger.error(
f"撤回消息时发生未知错误 (ID: {msg_id}) in {group_key}: {e}",
"广播撤回",
session=session_info,
e=e,
)
await asyncio.sleep(0.2)
logger.debug("撤回操作完成清空消息ID记录", "广播撤回", session=session_info)
cls.clear_last_broadcast_msg_ids()
return success_count, error_count

View File

@ -0,0 +1,584 @@
import base64
import json
from typing import Any
from nonebot.adapters import Bot, Event
from nonebot.adapters.onebot.v11 import Message as V11Message
from nonebot.adapters.onebot.v11 import MessageSegment as V11MessageSegment
from nonebot.exception import ActionFailed
import nonebot_plugin_alconna as alc
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_alconna.uniseg.segment import (
At,
AtAll,
CustomNode,
Image,
Reference,
Reply,
Text,
Video,
)
from nonebot_plugin_alconna.uniseg.tools import reply_fetch
from nonebot_plugin_session import EventSession
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.message import MessageUtils
from .broadcast_manager import BroadcastManager
MAX_FORWARD_DEPTH = 3
async def _process_forward_content(
forward_content: Any, forward_id: str | None, bot: Bot, depth: int
) -> list[CustomNode]:
"""处理转发消息内容"""
nodes_for_alc = []
content_parsed = False
if forward_content:
nodes_from_content = None
if isinstance(forward_content, list):
nodes_from_content = forward_content
elif isinstance(forward_content, str):
try:
parsed_content = json.loads(forward_content)
if isinstance(parsed_content, list):
nodes_from_content = parsed_content
except Exception as json_e:
logger.debug(
f"[Depth {depth}] JSON解析失败: {json_e}",
"广播",
)
if nodes_from_content is not None:
logger.debug(
f"[D{depth}] 节点数: {len(nodes_from_content)}",
"广播",
)
content_parsed = True
for node_data in nodes_from_content:
node = await _create_custom_node_from_data(node_data, bot, depth + 1)
if node:
nodes_for_alc.append(node)
if not content_parsed and forward_id:
logger.debug(
f"[D{depth}] 尝试API调用ID: {forward_id}",
"广播",
)
try:
forward_data = await bot.call_api("get_forward_msg", id=forward_id)
nodes_list = None
if isinstance(forward_data, dict) and "messages" in forward_data:
nodes_list = forward_data["messages"]
elif (
isinstance(forward_data, dict)
and "data" in forward_data
and isinstance(forward_data["data"], dict)
and "message" in forward_data["data"]
):
nodes_list = forward_data["data"]["message"]
elif isinstance(forward_data, list):
nodes_list = forward_data
if nodes_list:
node_count = len(nodes_list)
logger.debug(
f"[D{depth + 1}] 节点:{node_count}",
"广播",
)
for node_data in nodes_list:
node = await _create_custom_node_from_data(
node_data, bot, depth + 1
)
if node:
nodes_for_alc.append(node)
else:
logger.warning(
f"[D{depth + 1}] ID:{forward_id}无节点",
"广播",
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="错误",
content="[嵌套转发消息获取失败]",
)
)
except ActionFailed as af_e:
logger.error(
f"[D{depth + 1}] API失败: {af_e}",
"广播",
e=af_e,
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="错误",
content="[嵌套转发消息获取失败]",
)
)
except Exception as e:
logger.error(
f"[D{depth + 1}] 处理出错: {e}",
"广播",
e=e,
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="错误",
content="[处理嵌套转发时出错]",
)
)
elif not content_parsed and not forward_id:
logger.warning(
f"[D{depth}] 转发段无内容也无ID",
"广播",
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="错误",
content="[嵌套转发消息无法解析]",
)
)
elif content_parsed and not nodes_for_alc:
logger.warning(
f"[D{depth}] 解析成功但无有效节点",
"广播",
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="信息",
content="[嵌套转发内容为空]",
)
)
return nodes_for_alc
async def _create_custom_node_from_data(
node_data: dict, bot: Bot, depth: int
) -> CustomNode | None:
"""从节点数据创建CustomNode"""
node_content_raw = node_data.get("message") or node_data.get("content")
if not node_content_raw:
logger.warning(f"[D{depth}] 节点缺少消息内容", "广播")
return None
sender = node_data.get("sender", {})
uid = str(sender.get("user_id", "10000"))
name = sender.get("nickname", f"用户{uid[:4]}")
extracted_uni_msg = await _extract_content_from_message(
node_content_raw, bot, depth
)
if not extracted_uni_msg:
return None
return CustomNode(uid=uid, name=name, content=extracted_uni_msg)
async def _extract_broadcast_content(
bot: Bot,
event: Event,
arp: alc.Arparma,
session: EventSession,
) -> UniMessage | None:
"""从命令参数或引用消息中提取广播内容"""
broadcast_content_msg: UniMessage | None = None
command_content_list = arp.all_matched_args.get("content", [])
processed_command_list = []
has_command_content = False
if command_content_list:
for item in command_content_list:
if isinstance(item, alc.Segment):
processed_command_list.append(item)
if not (isinstance(item, Text) and not item.text.strip()):
has_command_content = True
elif isinstance(item, str):
if item.strip():
processed_command_list.append(Text(item.strip()))
has_command_content = True
else:
logger.warning(
f"Unexpected type in command content: {type(item)}", "广播"
)
if has_command_content:
logger.debug("检测到命令参数内容,优先使用参数内容", "广播", session=session)
broadcast_content_msg = UniMessage(processed_command_list)
if not broadcast_content_msg.filter(
lambda x: not (isinstance(x, Text) and not x.text.strip())
):
logger.warning(
"命令参数内容解析后为空或只包含空白", "广播", session=session
)
broadcast_content_msg = None
if not broadcast_content_msg:
reply_segment_obj: Reply | None = await reply_fetch(event, bot)
if (
reply_segment_obj
and hasattr(reply_segment_obj, "msg")
and reply_segment_obj.msg
):
logger.debug(
"未检测到有效命令参数,检测到引用消息", "广播", session=session
)
raw_quoted_content = reply_segment_obj.msg
is_forward = False
forward_id = None
if isinstance(raw_quoted_content, V11Message):
for seg in raw_quoted_content:
if isinstance(seg, V11MessageSegment):
if seg.type == "forward":
forward_id = seg.data.get("id")
is_forward = bool(forward_id)
break
elif seg.type == "json":
try:
json_data_str = seg.data.get("data", "{}")
if isinstance(json_data_str, str):
import json
json_data = json.loads(json_data_str)
if (
json_data.get("app") == "com.tencent.multimsg"
or json_data.get("view") == "Forward"
) and json_data.get("meta", {}).get(
"detail", {}
).get("resid"):
forward_id = json_data["meta"]["detail"][
"resid"
]
is_forward = True
break
except Exception:
pass
if is_forward and forward_id:
logger.info(
f"尝试获取并构造合并转发内容 (ID: {forward_id})",
"广播",
session=session,
)
nodes_to_forward: list[CustomNode] = []
try:
forward_data = await bot.call_api("get_forward_msg", id=forward_id)
nodes_list = None
if isinstance(forward_data, dict) and "messages" in forward_data:
nodes_list = forward_data["messages"]
elif (
isinstance(forward_data, dict)
and "data" in forward_data
and isinstance(forward_data["data"], dict)
and "message" in forward_data["data"]
):
nodes_list = forward_data["data"]["message"]
elif isinstance(forward_data, list):
nodes_list = forward_data
if nodes_list is not None:
for node_data in nodes_list:
node_sender = node_data.get("sender", {})
node_user_id = str(node_sender.get("user_id", "10000"))
node_nickname = node_sender.get(
"nickname", f"用户{node_user_id[:4]}"
)
node_content_raw = node_data.get(
"message"
) or node_data.get("content")
if node_content_raw:
extracted_node_uni_msg = (
await _extract_content_from_message(
node_content_raw, bot
)
)
if extracted_node_uni_msg:
nodes_to_forward.append(
CustomNode(
uid=node_user_id,
name=node_nickname,
content=extracted_node_uni_msg,
)
)
if nodes_to_forward:
broadcast_content_msg = UniMessage(
Reference(nodes=nodes_to_forward)
)
except ActionFailed:
await MessageUtils.build_message(
"获取合并转发消息失败,可能不支持此 API。"
).send(reply_to=True)
return None
except Exception as api_e:
logger.error(f"处理合并转发时出错: {api_e}", "广播", e=api_e)
await MessageUtils.build_message(
"处理合并转发消息时发生内部错误。"
).send(reply_to=True)
return None
else:
broadcast_content_msg = await _extract_content_from_message(
raw_quoted_content, bot
)
else:
logger.debug("未检测到命令参数和引用消息", "广播", session=session)
await MessageUtils.build_message("请提供广播内容或引用要广播的消息").send(
reply_to=True
)
return None
if not broadcast_content_msg:
logger.error(
"未能从命令参数或引用消息中获取有效的广播内容", "广播", session=session
)
await MessageUtils.build_message("错误:未能获取有效的广播内容。").send(
reply_to=True
)
return None
return broadcast_content_msg
async def _process_v11_segment(
seg_obj: V11MessageSegment | dict, depth: int, index: int, bot: Bot
) -> list[alc.Segment]:
"""处理V11消息段"""
result = []
seg_type = None
data_dict = None
if isinstance(seg_obj, V11MessageSegment):
seg_type = seg_obj.type
data_dict = seg_obj.data
elif isinstance(seg_obj, dict):
seg_type = seg_obj.get("type")
data_dict = seg_obj.get("data")
else:
return result
if not (seg_type and data_dict is not None):
logger.warning(f"[D{depth}] 跳过无效数据: {type(seg_obj)}", "广播")
return result
if seg_type == "text":
text_content = data_dict.get("text", "")
if isinstance(text_content, str) and text_content.strip():
result.append(Text(text_content))
elif seg_type == "image":
img_seg = None
if data_dict.get("url"):
img_seg = Image(url=data_dict["url"])
elif data_dict.get("file"):
file_val = data_dict["file"]
if isinstance(file_val, str) and file_val.startswith("base64://"):
b64_data = file_val[9:]
raw_bytes = base64.b64decode(b64_data)
img_seg = Image(raw=raw_bytes)
else:
img_seg = Image(path=file_val)
if img_seg:
result.append(img_seg)
else:
logger.warning(f"[Depth {depth}] V11 图片 {index} 缺少URL/文件", "广播")
elif seg_type == "at":
target_qq = data_dict.get("qq", "")
if target_qq.lower() == "all":
result.append(AtAll())
elif target_qq:
result.append(At(flag="user", target=target_qq))
elif seg_type == "video":
video_seg = None
if data_dict.get("url"):
video_seg = Video(url=data_dict["url"])
elif data_dict.get("file"):
file_val = data_dict["file"]
if isinstance(file_val, str) and file_val.startswith("base64://"):
b64_data = file_val[9:]
raw_bytes = base64.b64decode(b64_data)
video_seg = Video(raw=raw_bytes)
else:
video_seg = Video(path=file_val)
if video_seg:
result.append(video_seg)
logger.debug(f"[Depth {depth}] 处理视频消息成功", "广播")
else:
logger.warning(f"[Depth {depth}] V11 视频 {index} 缺少URL/文件", "广播")
elif seg_type == "forward":
nested_forward_id = data_dict.get("id") or data_dict.get("resid")
nested_forward_content = data_dict.get("content")
logger.debug(f"[D{depth}] 嵌套转发ID: {nested_forward_id}", "广播")
nested_nodes = await _process_forward_content(
nested_forward_content, nested_forward_id, bot, depth
)
if nested_nodes:
result.append(Reference(nodes=nested_nodes))
else:
logger.warning(f"[D{depth}] 跳过类型: {seg_type}", "广播")
return result
async def _extract_content_from_message(
message_content: Any, bot: Bot, depth: int = 0
) -> UniMessage:
"""提取消息内容到UniMessage"""
temp_msg = UniMessage()
input_type_str = str(type(message_content))
if depth >= MAX_FORWARD_DEPTH:
logger.warning(
f"[Depth {depth}] 达到最大递归深度 {MAX_FORWARD_DEPTH},停止解析嵌套转发。",
"广播",
)
temp_msg.append(Text("[嵌套转发层数过多,内容已省略]"))
return temp_msg
segments_to_process = []
if isinstance(message_content, UniMessage):
segments_to_process = list(message_content)
elif isinstance(message_content, V11Message):
segments_to_process = list(message_content)
elif isinstance(message_content, list):
segments_to_process = message_content
elif (
isinstance(message_content, dict)
and "type" in message_content
and "data" in message_content
):
segments_to_process = [message_content]
elif isinstance(message_content, str):
if message_content.strip():
temp_msg.append(Text(message_content))
return temp_msg
else:
logger.warning(f"[Depth {depth}] 无法处理的输入类型: {input_type_str}", "广播")
return temp_msg
if segments_to_process:
for index, seg_obj in enumerate(segments_to_process):
try:
if isinstance(seg_obj, Text):
text_content = getattr(seg_obj, "text", None)
if isinstance(text_content, str) and text_content.strip():
temp_msg.append(seg_obj)
elif isinstance(seg_obj, Image):
if (
getattr(seg_obj, "url", None)
or getattr(seg_obj, "path", None)
or getattr(seg_obj, "raw", None)
):
temp_msg.append(seg_obj)
elif isinstance(seg_obj, At):
temp_msg.append(seg_obj)
elif isinstance(seg_obj, AtAll):
temp_msg.append(seg_obj)
elif isinstance(seg_obj, Video):
if (
getattr(seg_obj, "url", None)
or getattr(seg_obj, "path", None)
or getattr(seg_obj, "raw", None)
):
temp_msg.append(seg_obj)
logger.debug(f"[D{depth}] 处理Video对象成功", "广播")
else:
processed_segments = await _process_v11_segment(
seg_obj, depth, index, bot
)
temp_msg.extend(processed_segments)
except Exception as e_conv_seg:
logger.warning(
f"[D{depth}] 处理段 {index} 出错: {e_conv_seg}",
"广播",
e=e_conv_seg,
)
if not temp_msg and message_content:
logger.warning(f"未能从类型 {input_type_str} 中提取内容", "广播")
return temp_msg
async def get_broadcast_target_groups(
bot: Bot, session: EventSession
) -> tuple[list, list]:
"""获取广播目标群组和启用了广播功能的群组"""
target_groups = []
all_groups, _ = await BroadcastManager.get_all_groups(bot)
current_group_id = None
if hasattr(session, "id2") and session.id2:
current_group_id = session.id2
if current_group_id:
target_groups = [
group for group in all_groups if group.group_id != current_group_id
]
logger.info(
f"向除当前群组({current_group_id})外的所有群组广播", "广播", session=session
)
else:
target_groups = all_groups
logger.info("向所有群组广播", "广播", session=session)
if not target_groups:
await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send(
reply_to=True
)
return [], []
enabled_groups = []
for group in target_groups:
if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id):
enabled_groups.append(group)
if not enabled_groups:
await MessageUtils.build_message(
"没有启用了广播功能的目标群组可供立即发送。"
).send(reply_to=True)
return target_groups, []
return target_groups, enabled_groups
async def send_broadcast_and_notify(
bot: Bot,
event: Event,
message: UniMessage,
enabled_groups: list,
target_groups: list,
session: EventSession,
) -> None:
"""发送广播并通知结果"""
BroadcastManager.clear_last_broadcast_msg_ids()
count, error_count = await BroadcastManager.send_to_specific_groups(
bot, message, enabled_groups, session
)
result = f"成功广播 {count} 个群组"
if error_count:
result += f"\n发送失败 {error_count} 个群组"
result += f"\n有效: {len(enabled_groups)} / 总计: {len(target_groups)}"
user_id = str(event.get_user_id())
await bot.send_private_msg(user_id=user_id, message=f"发送广播完成!\n{result}")
BroadcastManager.log_info(
f"广播完成,有效/总计: {len(enabled_groups)}/{len(target_groups)}",
session,
)

View File

@ -0,0 +1,64 @@
from datetime import datetime
from typing import Any
from nonebot_plugin_alconna import UniMessage
from zhenxun.models.group_console import GroupConsole
GroupKey = str
MessageID = int
BroadcastResult = tuple[int, int]
BroadcastDetailResult = tuple[int, int, int]
class BroadcastTarget:
"""广播目标"""
def __init__(self, group_id: str, channel_id: str | None = None):
self.group_id = group_id
self.channel_id = channel_id
def to_dict(self) -> dict[str, str | None]:
"""转换为字典格式"""
return {"group_id": self.group_id, "channel_id": self.channel_id}
@classmethod
def from_group_console(cls, group: GroupConsole) -> "BroadcastTarget":
"""从 GroupConsole 对象创建"""
return cls(group_id=group.group_id, channel_id=group.channel_id)
@property
def key(self) -> str:
"""获取群组的唯一标识"""
if self.channel_id:
return f"{self.group_id}:{self.channel_id}"
return str(self.group_id)
class BroadcastTask:
"""广播任务"""
def __init__(
self,
bot_id: str,
message: UniMessage,
targets: list[BroadcastTarget],
scheduled_time: datetime | None = None,
task_id: str | None = None,
):
self.bot_id = bot_id
self.message = message
self.targets = targets
self.scheduled_time = scheduled_time
self.task_id = task_id
def to_dict(self) -> dict[str, Any]:
"""转换为字典格式,用于序列化"""
return {
"bot_id": self.bot_id,
"targets": [t.to_dict() for t in self.targets],
"scheduled_time": self.scheduled_time.isoformat()
if self.scheduled_time
else None,
"task_id": self.task_id,
}

View File

@ -0,0 +1,175 @@
import base64
import nonebot_plugin_alconna as alc
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_alconna.uniseg import Reference
from nonebot_plugin_alconna.uniseg.segment import CustomNode, Video
from zhenxun.services.log import logger
def uni_segment_to_v11_segment_dict(
seg: alc.Segment, depth: int = 0
) -> dict | list[dict] | None:
"""UniSeg段转V11字典"""
if isinstance(seg, alc.Text):
return {"type": "text", "data": {"text": seg.text}}
elif isinstance(seg, alc.Image):
if getattr(seg, "url", None):
return {
"type": "image",
"data": {"file": seg.url},
}
elif getattr(seg, "raw", None):
raw_data = seg.raw
if isinstance(raw_data, str):
if len(raw_data) >= 9 and raw_data[:9] == "base64://":
return {"type": "image", "data": {"file": raw_data}}
elif isinstance(raw_data, bytes):
b64_str = base64.b64encode(raw_data).decode()
return {"type": "image", "data": {"file": f"base64://{b64_str}"}}
else:
logger.warning(f"无法处理 Image.raw 的类型: {type(raw_data)}", "广播")
elif getattr(seg, "path", None):
logger.warning(
f"在合并转发中使用了本地图片路径,可能无法显示: {seg.path}", "广播"
)
return {"type": "image", "data": {"file": f"file:///{seg.path}"}}
else:
logger.warning(f"alc.Image 缺少有效数据,无法转换为 V11 段: {seg}", "广播")
elif isinstance(seg, alc.At):
return {"type": "at", "data": {"qq": seg.target}}
elif isinstance(seg, alc.AtAll):
return {"type": "at", "data": {"qq": "all"}}
elif isinstance(seg, Video):
if getattr(seg, "url", None):
return {
"type": "video",
"data": {"file": seg.url},
}
elif getattr(seg, "raw", None):
raw_data = seg.raw
if isinstance(raw_data, str):
if len(raw_data) >= 9 and raw_data[:9] == "base64://":
return {"type": "video", "data": {"file": raw_data}}
elif isinstance(raw_data, bytes):
b64_str = base64.b64encode(raw_data).decode()
return {"type": "video", "data": {"file": f"base64://{b64_str}"}}
else:
logger.warning(f"无法处理 Video.raw 的类型: {type(raw_data)}", "广播")
elif getattr(seg, "path", None):
logger.warning(
f"在合并转发中使用了本地视频路径,可能无法显示: {seg.path}", "广播"
)
return {"type": "video", "data": {"file": f"file:///{seg.path}"}}
else:
logger.warning(f"Video 缺少有效数据,无法转换为 V11 段: {seg}", "广播")
elif isinstance(seg, Reference) and getattr(seg, "nodes", None):
if depth >= 3:
logger.warning(
f"嵌套转发深度超过限制 (depth={depth}),不再继续解析", "广播"
)
return {"type": "text", "data": {"text": "[嵌套转发层数过多,内容已省略]"}}
nested_v11_content_list = []
nodes_list = getattr(seg, "nodes", [])
for node in nodes_list:
if isinstance(node, CustomNode):
node_v11_content = []
if isinstance(node.content, UniMessage):
for nested_seg in node.content:
converted_dict = uni_segment_to_v11_segment_dict(
nested_seg, depth + 1
)
if isinstance(converted_dict, list):
node_v11_content.extend(converted_dict)
elif converted_dict:
node_v11_content.append(converted_dict)
elif isinstance(node.content, str):
node_v11_content.append(
{"type": "text", "data": {"text": node.content}}
)
if node_v11_content:
separator = {
"type": "text",
"data": {
"text": f"\n--- 来自 {node.name} ({node.uid}) 的消息 ---\n"
},
}
nested_v11_content_list.insert(0, separator)
nested_v11_content_list.extend(node_v11_content)
nested_v11_content_list.append(
{"type": "text", "data": {"text": "\n---\n"}}
)
return nested_v11_content_list
else:
logger.warning(f"广播时跳过不支持的 UniSeg 段类型: {type(seg)}", "广播")
return None
def uni_message_to_v11_list_of_dicts(uni_msg: UniMessage | str | list) -> list[dict]:
"""UniMessage转V11字典列表"""
try:
if isinstance(uni_msg, str):
return [{"type": "text", "data": {"text": uni_msg}}]
if isinstance(uni_msg, list):
if not uni_msg:
return []
if all(isinstance(item, str) for item in uni_msg):
return [{"type": "text", "data": {"text": item}} for item in uni_msg]
result = []
for item in uni_msg:
if hasattr(item, "__iter__") and not isinstance(item, str | bytes):
result.extend(uni_message_to_v11_list_of_dicts(item))
elif hasattr(item, "text") and not isinstance(item, str | bytes):
text_value = getattr(item, "text", "")
result.append({"type": "text", "data": {"text": str(text_value)}})
elif hasattr(item, "url") and not isinstance(item, str | bytes):
url_value = getattr(item, "url", "")
if isinstance(item, Video):
result.append(
{"type": "video", "data": {"file": str(url_value)}}
)
else:
result.append(
{"type": "image", "data": {"file": str(url_value)}}
)
else:
try:
result.append({"type": "text", "data": {"text": str(item)}})
except Exception as e:
logger.warning(f"无法转换列表元素: {item}, 错误: {e}", "广播")
return result
except Exception as e:
logger.warning(f"消息转换过程中出错: {e}", "广播")
return [{"type": "text", "data": {"text": str(uni_msg)}}]
def custom_nodes_to_v11_nodes(custom_nodes: list[CustomNode]) -> list[dict]:
"""CustomNode列表转V11节点"""
v11_nodes = []
for node in custom_nodes:
v11_content_list = uni_message_to_v11_list_of_dicts(node.content)
if v11_content_list:
v11_nodes.append(
{
"type": "node",
"data": {
"user_id": str(node.uid),
"nickname": node.name,
"content": v11_content_list,
},
}
)
else:
logger.warning(
f"CustomNode (uid={node.uid}) 内容转换后为空,跳过此节点", "广播"
)
return v11_nodes

View File

@ -15,7 +15,8 @@ async def get_task() -> dict[str, str] | None:
return {
"name": "被动技能",
"description": "控制群组中的被动技能状态",
"usage": "通过 开启/关闭群被动 来控制群被动 <br> ---------- <br> "
"usage": "通过 开启/关闭群被动 来控制群被动 <br>"
+ " 示例:开启/关闭群被动早晚安 <br> ---------- <br> "
+ "<br>".join([task.name for task in task_list]),
}
return None

View File

@ -31,8 +31,7 @@ from .public import init_public
__plugin_meta__ = PluginMetadata(
name="WebUi",
description="WebUi API",
usage="""
""".strip(),
usage='"""\n """.strip(),',
extra=PluginExtraData(
author="HibiKier",
version="0.1",
@ -86,7 +85,6 @@ BaseApiRouter.include_router(system_router)
BaseApiRouter.include_router(menu_router)
BaseApiRouter.include_router(configure_router)
WsApiRouter = APIRouter(prefix="/zhenxun/socket")
WsApiRouter.include_router(ws_log_routes)
@ -97,6 +95,8 @@ WsApiRouter.include_router(chat_routes)
@PriorityLifecycle.on_startup(priority=0)
async def _():
try:
# 存储任务引用的列表,防止任务被垃圾回收
_tasks = []
async def log_sink(message: str):
loop = None
@ -107,7 +107,8 @@ async def _():
logger.warning("Web Ui log_sink", e=e)
if not loop:
loop = asyncio.new_event_loop()
loop.create_task(LOG_STORAGE.add(message.rstrip("\n"))) # noqa: RUF006
# 存储任务引用到外部列表中
_tasks.append(loop.create_task(LOG_STORAGE.add(message.rstrip("\n"))))
logger_.add(
log_sink, colorize=True, filter=default_filter, format=default_format

View File

@ -63,6 +63,7 @@ class MenuManager:
next(i for i, m in enumerate(default_menus) if m.module == module)
]
def get_menus(self):
return MenuData(menus=self.menu)

View File

@ -16,7 +16,7 @@ from zhenxun.utils.platform import PlatformUtils
from ....base_model import Result
from ....config import QueryDateType
from ....utils import authentication, get_system_status
from ....utils import authentication, clear_help_image, get_system_status
from .data_source import ApiDataSource
from .model import (
ActiveGroup,
@ -234,6 +234,7 @@ async def _(param: BotManageUpdateParam):
bot_data.block_plugins = CommonUtils.convert_module_format(param.block_plugins)
bot_data.block_tasks = CommonUtils.convert_module_format(param.block_tasks)
await bot_data.save(update_fields=["block_plugins", "block_tasks"])
clear_help_image()
return Result.ok()
except Exception as e:
logger.error(f"{router.prefix}/update_bot_manage 调用错误", "WebUi", e=e)

View File

@ -92,7 +92,7 @@ class ApiDataSource:
"""
version_file = Path() / "__version__"
if version_file.exists():
if text := version_file.open().read():
if text := version_file.open(encoding="utf-8").read():
return text.replace("__version__: ", "").strip()
return "unknown"

View File

@ -1,3 +1,5 @@
from datetime import datetime
from fastapi import APIRouter
import nonebot
from nonebot import on_message
@ -49,13 +51,14 @@ async def message_handle(
message: UniMsg,
group_id: str | None,
):
time = str(datetime.now().replace(microsecond=0))
messages = []
for m in message:
if isinstance(m, Text | str):
messages.append(MessageItem(type="text", msg=str(m)))
messages.append(MessageItem(type="text", msg=str(m), time=time))
elif isinstance(m, Image):
if m.url:
messages.append(MessageItem(type="img", msg=m.url))
messages.append(MessageItem(type="img", msg=m.url, time=time))
elif isinstance(m, At):
if group_id:
if m.target == "0":
@ -72,9 +75,9 @@ async def message_handle(
uname = group_user.user_name
if m.target not in ID2NAME[group_id]:
ID2NAME[group_id][m.target] = uname
messages.append(MessageItem(type="at", msg=f"@{uname}"))
messages.append(MessageItem(type="at", msg=f"@{uname}", time=time))
elif isinstance(m, Hyper):
messages.append(MessageItem(type="text", msg="[分享消息]"))
messages.append(MessageItem(type="text", msg="[分享消息]", time=time))
return messages

View File

@ -237,6 +237,8 @@ class MessageItem(BaseModel):
"""消息类型"""
msg: str
"""内容"""
time: str
"""发送日期"""
class Message(BaseModel):

View File

@ -6,13 +6,16 @@ from zhenxun.services.log import logger
from zhenxun.utils.enum import BlockType, PluginType
from ....base_model import Result
from ....utils import authentication
from ....utils import authentication, clear_help_image
from .data_source import ApiDataSource
from .model import (
BatchUpdatePlugins,
BatchUpdateResult,
PluginCount,
PluginDetail,
PluginInfo,
PluginSwitch,
RenameMenuTypePayload,
UpdatePlugin,
)
@ -30,9 +33,8 @@ async def _(
plugin_type: list[PluginType] = Query(None), menu_type: str | None = None
) -> Result[list[PluginInfo]]:
try:
return Result.ok(
await ApiDataSource.get_plugin_list(plugin_type, menu_type), "拿到信息啦!"
)
result = await ApiDataSource.get_plugin_list(plugin_type, menu_type)
return Result.ok(result, "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_plugin_list 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@ -78,6 +80,7 @@ async def _() -> Result[PluginCount]:
async def _(param: UpdatePlugin) -> Result:
try:
await ApiDataSource.update_plugin(param)
clear_help_image()
return Result.ok(info="已经帮你写好啦!")
except (ValueError, KeyError):
return Result.fail("插件数据不存在...")
@ -105,6 +108,7 @@ async def _(param: PluginSwitch) -> Result:
db_plugin.block_type = None
db_plugin.status = True
await db_plugin.save()
clear_help_image()
return Result.ok(info="成功改变了开关状态!")
except Exception as e:
logger.error(f"{router.prefix}/change_switch 调用错误", "WebUi", e=e)
@ -144,11 +148,68 @@ async def _() -> Result[list[str]]:
)
async def _(module: str) -> Result[PluginDetail]:
try:
return Result.ok(
await ApiDataSource.get_plugin_detail(module), "已经帮你写好啦!"
)
detail = await ApiDataSource.get_plugin_detail(module)
return Result.ok(detail, "已经帮你写好啦!")
except (ValueError, KeyError):
return Result.fail("插件数据不存在...")
except Exception as e:
logger.error(f"{router.prefix}/get_plugin 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.put(
"/plugins/batch_update",
dependencies=[authentication()],
response_model=Result[BatchUpdateResult],
response_class=JSONResponse,
summary="批量更新插件配置",
)
async def batch_update_plugin_config_api(
params: BatchUpdatePlugins,
) -> Result[BatchUpdateResult]:
"""批量更新插件配置,如开关、类型等"""
try:
result_dict = await ApiDataSource.batch_update_plugins(params=params)
result_model = BatchUpdateResult(
success=result_dict["success"],
updated_count=result_dict["updated_count"],
errors=result_dict["errors"],
)
clear_help_image()
return Result.ok(result_model, "插件配置更新完成")
except Exception as e:
logger.error(f"{router.prefix}/plugins/batch_update 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
# 新增:重命名菜单类型路由
@router.put(
"/menu_type/rename",
dependencies=[authentication()],
response_model=Result,
summary="重命名菜单类型",
)
async def rename_menu_type_api(payload: RenameMenuTypePayload) -> Result:
try:
result = await ApiDataSource.rename_menu_type(
old_name=payload.old_name, new_name=payload.new_name
)
if result.get("success"):
clear_help_image()
return Result.ok(
info=result.get(
"info",
f"成功将 {result.get('updated_count', 0)} 个插件的菜单类型从 "
f"'{payload.old_name}' 修改为 '{payload.new_name}'",
)
)
else:
return Result.fail(info=result.get("info", "重命名失败"))
except ValueError as ve:
return Result.fail(info=str(ve))
except RuntimeError as re:
logger.error(f"{router.prefix}/menu_type/rename 调用错误", "WebUi", e=re)
return Result.fail(info=str(re))
except Exception as e:
logger.error(f"{router.prefix}/menu_type/rename 调用错误", "WebUi", e=e)
return Result.fail(info=f"发生未知错误: {type(e).__name__}")

View File

@ -2,13 +2,20 @@ import re
import cattrs
from fastapi import Query
from tortoise.exceptions import DoesNotExist
from zhenxun.configs.config import Config
from zhenxun.configs.utils import ConfigGroup
from zhenxun.models.plugin_info import PluginInfo as DbPluginInfo
from zhenxun.utils.enum import BlockType, PluginType
from .model import PluginConfig, PluginDetail, PluginInfo, UpdatePlugin
from .model import (
BatchUpdatePlugins,
PluginConfig,
PluginDetail,
PluginInfo,
UpdatePlugin,
)
class ApiDataSource:
@ -44,6 +51,11 @@ class ApiDataSource:
level=plugin.level,
status=plugin.status,
author=plugin.author,
block_type=plugin.block_type,
is_builtin="builtin_plugins" in plugin.module_path
or plugin.plugin_type == PluginType.HIDDEN,
allow_setting=plugin.plugin_type != PluginType.HIDDEN,
allow_switch=plugin.plugin_type != PluginType.HIDDEN,
)
plugin_list.append(plugin_info)
return plugin_list
@ -69,7 +81,6 @@ class ApiDataSource:
db_plugin.block_type = param.block_type
db_plugin.status = param.block_type != BlockType.ALL
await db_plugin.save()
# 配置项
if param.configs and (configs := Config.get(param.module)):
for key in param.configs:
if c := configs.configs.get(key):
@ -80,6 +91,87 @@ class ApiDataSource:
Config.save(save_simple_data=True)
return db_plugin
@classmethod
async def batch_update_plugins(cls, params: BatchUpdatePlugins) -> dict:
"""批量更新插件数据
参数:
params: BatchUpdatePlugins
返回:
dict: 更新结果, 例如 {'success': True, 'updated_count': 5, 'errors': []}
"""
plugins_to_update_other_fields = []
other_update_fields = set()
updated_count = 0
errors = []
for item in params.updates:
try:
db_plugin = await DbPluginInfo.get(module=item.module)
plugin_changed_other = False
plugin_changed_block = False
if db_plugin.block_type != item.block_type:
db_plugin.block_type = item.block_type
db_plugin.status = item.block_type != BlockType.ALL
plugin_changed_block = True
if item.menu_type is not None and db_plugin.menu_type != item.menu_type:
db_plugin.menu_type = item.menu_type
other_update_fields.add("menu_type")
plugin_changed_other = True
if (
item.default_status is not None
and db_plugin.default_status != item.default_status
):
db_plugin.default_status = item.default_status
other_update_fields.add("default_status")
plugin_changed_other = True
if plugin_changed_block:
try:
await db_plugin.save(update_fields=["block_type", "status"])
updated_count += 1
except Exception as e_save:
errors.append(
{
"module": item.module,
"error": f"Save block_type failed: {e_save!s}",
}
)
plugin_changed_other = False
if plugin_changed_other:
plugins_to_update_other_fields.append(db_plugin)
except DoesNotExist:
errors.append({"module": item.module, "error": "Plugin not found"})
except Exception as e:
errors.append({"module": item.module, "error": str(e)})
bulk_updated_count = 0
if plugins_to_update_other_fields and other_update_fields:
try:
await DbPluginInfo.bulk_update(
plugins_to_update_other_fields, list(other_update_fields)
)
bulk_updated_count = len(plugins_to_update_other_fields)
except Exception as e_bulk:
errors.append(
{
"module": "batch_update_other",
"error": f"Bulk update failed: {e_bulk!s}",
}
)
return {
"success": len(errors) == 0,
"updated_count": updated_count + bulk_updated_count,
"errors": errors,
}
@classmethod
def __build_plugin_config(
cls, module: str, cfg: str, config: ConfigGroup
@ -115,6 +207,41 @@ class ApiDataSource:
type_inner=type_inner, # type: ignore
)
@classmethod
async def rename_menu_type(cls, old_name: str, new_name: str) -> dict:
"""重命名菜单类型,并更新所有相关插件
参数:
old_name: 旧菜单类型名称
new_name: 新菜单类型名称
返回:
dict: 更新结果, 例如 {'success': True, 'updated_count': 3}
"""
if not old_name or not new_name:
raise ValueError("旧名称和新名称都不能为空")
if old_name == new_name:
return {
"success": True,
"updated_count": 0,
"info": "新旧名称相同,无需更新",
}
# 检查新名称是否已存在(理论上前端会校验,后端再保险一次)
exists = await DbPluginInfo.filter(menu_type=new_name).exists()
if exists:
raise ValueError(f"新的菜单类型名称 '{new_name}' 已被其他插件使用")
try:
# 使用 filter().update() 进行批量更新
updated_count = await DbPluginInfo.filter(menu_type=old_name).update(
menu_type=new_name
)
return {"success": True, "updated_count": updated_count}
except Exception as e:
# 可以添加更详细的日志记录
raise RuntimeError(f"数据库更新菜单类型失败: {e!s}")
@classmethod
async def get_plugin_detail(cls, module: str) -> PluginDetail:
"""获取插件详情

View File

@ -1,6 +1,6 @@
from typing import Any
from pydantic import BaseModel
from pydantic import BaseModel, Field
from zhenxun.utils.enum import BlockType
@ -37,19 +37,19 @@ class UpdatePlugin(BaseModel):
module: str
"""模块"""
default_status: bool
"""默认开关"""
"""是否默认开启"""
limit_superuser: bool
"""限制超级用户"""
cost_gold: int
"""金币花费"""
menu_type: str
"""插件菜单类型"""
"""是否限制超级用户"""
level: int
"""插件所需群权限"""
"""等级"""
cost_gold: int
"""花费金币"""
menu_type: str
"""菜单类型"""
block_type: BlockType | None = None
"""禁用类型"""
configs: dict[str, Any] | None = None
"""配置项"""
"""置项"""
class PluginInfo(BaseModel):
@ -58,27 +58,33 @@ class PluginInfo(BaseModel):
"""
module: str
"""插件名称"""
"""模块"""
plugin_name: str
"""插件中文名称"""
"""插件名称"""
default_status: bool
"""默认开关"""
"""是否默认开启"""
limit_superuser: bool
"""限制超级用户"""
"""是否限制超级用户"""
level: int
"""等级"""
cost_gold: int
"""花费金币"""
menu_type: str
"""插件菜单类型"""
"""菜单类型"""
version: str
"""插件版本"""
level: int
"""群权限"""
"""版本"""
status: bool
"""当前状态"""
"""状态"""
author: str | None = None
"""作者"""
block_type: BlockType | None = None
"""禁用类型"""
block_type: BlockType | None = Field(None, description="插件禁用状态 (None: 启用)")
"""禁用状态"""
is_builtin: bool = False
"""是否为内置插件"""
allow_switch: bool = True
"""是否允许开关"""
allow_setting: bool = True
"""是否允许设置"""
class PluginConfig(BaseModel):
@ -86,20 +92,13 @@ class PluginConfig(BaseModel):
插件配置项
"""
module: str
"""模块"""
key: str
""""""
value: Any
""""""
help: str | None = None
"""帮助"""
default_value: Any
"""默认值"""
type: Any = None
"""值类型"""
type_inner: list[str] | None = None
"""List Tuple等内部类型检验"""
module: str = Field(..., description="模块名")
key: str = Field(..., description="")
value: Any = Field(None, description="")
help: str | None = Field(None, description="帮助信息")
default_value: Any = Field(None, description="默认值")
type: str | None = Field(None, description="类型")
type_inner: list[str] | None = Field(None, description="内部类型")
class PluginCount(BaseModel):
@ -117,6 +116,21 @@ class PluginCount(BaseModel):
"""其他插件"""
class BatchUpdatePluginItem(BaseModel):
module: str = Field(..., description="插件模块名")
default_status: bool | None = Field(None, description="默认状态(开关)")
menu_type: str | None = Field(None, description="菜单类型")
block_type: BlockType | None = Field(
None, description="插件禁用状态 (None: 启用, ALL: 禁用)"
)
class BatchUpdatePlugins(BaseModel):
updates: list[BatchUpdatePluginItem] = Field(
..., description="要批量更新的插件列表"
)
class PluginDetail(PluginInfo):
"""
插件详情
@ -125,6 +139,26 @@ class PluginDetail(PluginInfo):
config_list: list[PluginConfig]
class RenameMenuTypePayload(BaseModel):
old_name: str = Field(..., description="旧菜单类型名称")
new_name: str = Field(..., description="新菜单类型名称")
class PluginIr(BaseModel):
id: int
"""插件id"""
class BatchUpdateResult(BaseModel):
"""
批量更新插件结果
"""
success: bool = Field(..., description="是否全部成功")
"""是否全部成功"""
updated_count: int = Field(..., description="更新成功的数量")
"""更新成功的数量"""
errors: list[dict[str, str]] = Field(
default_factory=list, description="错误信息列表"
)
"""错误信息列表"""

View File

@ -31,20 +31,18 @@ async def _(path: str | None = None) -> Result[list[DirFile]]:
return Result.fail(error)
if not base_path:
return Result.fail("无效的路径")
data_list = []
for file in os.listdir(base_path):
file_path = base_path / file
is_image = any(file.endswith(f".{t}") for t in IMAGE_TYPE)
data_list.append(
DirFile(
is_file=not file_path.is_dir(),
is_image=is_image,
name=file,
parent=str(base_path.relative_to(Path().resolve()))
if path
else None,
)
data_list = []
for file in os.listdir(base_path):
file_path = base_path / file
is_image = any(file.endswith(f".{t}") for t in IMAGE_TYPE)
data_list.append(
DirFile(
is_file=not file_path.is_dir(),
is_image=is_image,
name=file,
parent=path,
size=None if file_path.is_dir() else file_path.stat().st_size,
mtime=file_path.stat().st_mtime,
)
return Result.ok(data_list)
except Exception as e:
@ -265,3 +263,13 @@ async def _(full_path: str) -> Result[str]:
return Result.ok(BuildImage.open(path).pic2bs4())
except Exception as e:
return Result.warning_(f"获取图片失败: {e!s}")
@router.get(
"/ping",
response_model=Result[str],
response_class=JSONResponse,
description="检查服务器状态",
)
async def _() -> Result[str]:
return Result.ok("pong")

View File

@ -14,6 +14,10 @@ class DirFile(BaseModel):
"""文件夹或文件名称"""
parent: str | None = None
"""父级"""
size: int | None = None
"""文件大小"""
mtime: float | None = None
"""修改时间"""
class DeleteFile(BaseModel):

View File

@ -12,7 +12,7 @@ import psutil
import ujson as json
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH
from .base_model import SystemFolderSize, SystemStatus, User
@ -67,6 +67,21 @@ def validate_path(path_str: str | None) -> tuple[Path | None, str | None]:
except Exception as e:
return None, f"路径验证失败: {e!s}"
GROUP_HELP_PATH = DATA_PATH / "group_help"
SIMPLE_HELP_IMAGE = IMAGE_PATH / "SIMPLE_HELP.png"
SIMPLE_DETAIL_HELP_IMAGE = IMAGE_PATH / "SIMPLE_DETAIL_HELP.png"
def clear_help_image():
"""清理帮助图片"""
if SIMPLE_HELP_IMAGE.exists():
SIMPLE_HELP_IMAGE.unlink()
if SIMPLE_DETAIL_HELP_IMAGE.exists():
SIMPLE_DETAIL_HELP_IMAGE.unlink()
for file in GROUP_HELP_PATH.iterdir():
if file.is_file():
file.unlink()
def get_user(uname: str) -> User | None:
"""获取账号密码

View File

@ -1,4 +1,4 @@
from typing import Any, overload
from typing import Any, cast, overload
from typing_extensions import Self
from tortoise import fields
@ -10,6 +10,42 @@ from zhenxun.services.db_context import Model
from zhenxun.utils.enum import PluginType
def add_disable_marker(name: str) -> str:
"""添加模块禁用标记符
Args:
name: 模块名称
Returns:
添加了禁用标记的模块名 (前缀'<'和后缀',')
"""
return f"<{name},"
@overload
def convert_module_format(data: str) -> list[str]: ...
@overload
def convert_module_format(data: list[str]) -> str: ...
def convert_module_format(data: str | list[str]) -> str | list[str]:
"""
`<aaa,<bbb,<ccc,` `["aaa", "bbb", "ccc"]` (即禁用启用)之间进行相互转换
参数:
data: 要转换的数据
返回:
str | list[str]: 根据输入类型返回转换后的数据
"""
if isinstance(data, str):
return [item.strip(",") for item in data.split("<") if item.strip()]
else:
return "".join(add_disable_marker(item) for item in data)
class GroupConsole(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
@ -51,33 +87,34 @@ class GroupConsole(Model):
table_description = "群组信息表"
unique_together = ("group_id", "channel_id")
@staticmethod
def format(name: str) -> str:
return f"<{name},"
@overload
@classmethod
def convert_module_format(cls, data: str) -> list[str]: ...
@overload
@classmethod
def convert_module_format(cls, data: list[str]) -> str: ...
@classmethod
def convert_module_format(cls, data: str | list[str]) -> str | list[str]:
"""
`<aaa,<bbb,<ccc,` `["aaa", "bbb", "ccc"]` 之间进行相互转换
参数:
data (str | list[str]): 输入数据可能是格式化字符串或字符串列表
async def _get_task_modules(cls, *, default_status: bool) -> list[str]:
"""获取默认禁用的任务模块
返回:
str | list[str]: 根据输入类型返回转换后的数据
list[str]: 任务模块列表
"""
if isinstance(data, str):
return [item.strip(",") for item in data.split("<") if item]
elif isinstance(data, list):
return "".join(cls.format(item) for item in data)
return cast(
list[str],
await TaskInfo.filter(default_status=default_status).values_list(
"module", flat=True
),
)
@classmethod
async def _get_plugin_modules(cls, *, default_status: bool) -> list[str]:
"""获取默认禁用的插件模块
返回:
list[str]: 插件模块列表
"""
return cast(
list[str],
await PluginInfo.filter(
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT],
default_status=default_status,
).values_list("module", flat=True),
)
@classmethod
async def create(
@ -85,20 +122,44 @@ class GroupConsole(Model):
) -> Self:
"""覆盖create方法"""
group = await super().create(using_db=using_db, **kwargs)
if modules := await TaskInfo.filter(default_status=False).values_list(
"module", flat=True
):
group.block_task = cls.convert_module_format(modules) # type: ignore
if modules := await PluginInfo.filter(
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT],
default_status=False,
).values_list("module", flat=True):
group.block_plugin = cls.convert_module_format(modules) # type: ignore
await group.save(
using_db=using_db, update_fields=["block_plugin", "block_task"]
)
task_modules = await cls._get_task_modules(default_status=False)
plugin_modules = await cls._get_plugin_modules(default_status=False)
if task_modules or plugin_modules:
await cls._update_modules(group, task_modules, plugin_modules, using_db)
return group
@classmethod
async def _update_modules(
cls,
group: Self,
task_modules: list[str],
plugin_modules: list[str],
using_db: BaseDBAsyncClient | None = None,
) -> None:
"""更新模块设置
参数:
group: 群组实例
task_modules: 任务模块列表
plugin_modules: 插件模块列表
using_db: 数据库连接
"""
update_fields = []
if task_modules:
group.block_task = convert_module_format(task_modules)
update_fields.append("block_task")
if plugin_modules:
group.block_plugin = convert_module_format(plugin_modules)
update_fields.append("block_plugin")
if update_fields:
await group.save(using_db=using_db, update_fields=update_fields)
@classmethod
async def get_or_create(
cls,
@ -110,20 +171,15 @@ class GroupConsole(Model):
group, is_create = await super().get_or_create(
defaults=defaults, using_db=using_db, **kwargs
)
if is_create and (
modules := await TaskInfo.filter(default_status=False).values_list(
"module", flat=True
)
):
group.block_task = cls.convert_module_format(modules) # type: ignore
if modules := await PluginInfo.filter(
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT],
default_status=False,
).values_list("module", flat=True):
group.block_plugin = cls.convert_module_format(modules) # type: ignore
await group.save(
using_db=using_db, update_fields=["block_plugin", "block_task"]
)
if not is_create:
return group, is_create
task_modules = await cls._get_task_modules(default_status=False)
plugin_modules = await cls._get_plugin_modules(default_status=False)
if task_modules or plugin_modules:
await cls._update_modules(group, task_modules, plugin_modules, using_db)
return group, is_create
@classmethod
@ -137,20 +193,15 @@ class GroupConsole(Model):
group, is_create = await super().update_or_create(
defaults=defaults, using_db=using_db, **kwargs
)
if is_create and (
modules := await TaskInfo.filter(default_status=False).values_list(
"module", flat=True
)
):
group.block_task = cls.convert_module_format(modules) # type: ignore
if modules := await PluginInfo.filter(
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT],
default_status=False,
).values_list("module", flat=True):
group.block_plugin = cls.convert_module_format(modules) # type: ignore
await group.save(
using_db=using_db, update_fields=["block_plugin", "block_task"]
)
if not is_create:
return group, is_create
task_modules = await cls._get_task_modules(default_status=False)
plugin_modules = await cls._get_plugin_modules(default_status=False)
if task_modules or plugin_modules:
await cls._update_modules(group, task_modules, plugin_modules, using_db)
return group, is_create
@classmethod
@ -195,7 +246,7 @@ class GroupConsole(Model):
"""
return await cls.exists(
group_id=group_id,
superuser_block_plugin__contains=f"<{module},",
superuser_block_plugin__contains=add_disable_marker(module),
)
@classmethod
@ -209,10 +260,11 @@ class GroupConsole(Model):
返回:
bool: 是否禁用插件
"""
module = add_disable_marker(module)
return await cls.exists(
group_id=group_id, block_plugin__contains=f"<{module},"
group_id=group_id, block_plugin__contains=module
) or await cls.exists(
group_id=group_id, superuser_block_plugin__contains=f"<{module},"
group_id=group_id, superuser_block_plugin__contains=module
)
@classmethod
@ -234,12 +286,22 @@ class GroupConsole(Model):
group, _ = await cls.get_or_create(
group_id=group_id, defaults={"platform": platform}
)
update_fields = []
if is_superuser:
if f"<{module}," not in group.superuser_block_plugin:
group.superuser_block_plugin += f"<{module},"
elif f"<{module}," not in group.block_plugin:
group.block_plugin += f"<{module},"
await group.save(update_fields=["block_plugin", "superuser_block_plugin"])
superuser_block_plugin = convert_module_format(group.superuser_block_plugin)
if module not in superuser_block_plugin:
superuser_block_plugin.append(module)
group.superuser_block_plugin = convert_module_format(
superuser_block_plugin
)
update_fields.append("superuser_block_plugin")
elif add_disable_marker(module) not in group.block_plugin:
block_plugin = convert_module_format(group.block_plugin)
block_plugin.append(module)
group.block_plugin = convert_module_format(block_plugin)
update_fields.append("block_plugin")
if update_fields:
await group.save(update_fields=update_fields)
@classmethod
async def set_unblock_plugin(
@ -260,14 +322,22 @@ class GroupConsole(Model):
group, _ = await cls.get_or_create(
group_id=group_id, defaults={"platform": platform}
)
update_fields = []
if is_superuser:
if f"<{module}," in group.superuser_block_plugin:
group.superuser_block_plugin = group.superuser_block_plugin.replace(
f"<{module},", ""
superuser_block_plugin = convert_module_format(group.superuser_block_plugin)
if module in superuser_block_plugin:
superuser_block_plugin.remove(module)
group.superuser_block_plugin = convert_module_format(
superuser_block_plugin
)
elif f"<{module}," in group.block_plugin:
group.block_plugin = group.block_plugin.replace(f"<{module},", "")
await group.save(update_fields=["block_plugin", "superuser_block_plugin"])
update_fields.append("superuser_block_plugin")
elif add_disable_marker(module) in group.block_plugin:
block_plugin = convert_module_format(group.block_plugin)
block_plugin.remove(module)
group.block_plugin = convert_module_format(block_plugin)
update_fields.append("block_plugin")
if update_fields:
await group.save(update_fields=update_fields)
@classmethod
async def is_normal_block_plugin(
@ -302,7 +372,7 @@ class GroupConsole(Model):
"""
return await cls.exists(
group_id=group_id,
superuser_block_task__contains=f"<{task},",
superuser_block_task__contains=add_disable_marker(task),
)
@classmethod
@ -319,22 +389,23 @@ class GroupConsole(Model):
返回:
bool: 是否禁用被动
"""
task = add_disable_marker(task)
if not channel_id:
return await cls.exists(
group_id=group_id,
channel_id__isnull=True,
block_task__contains=f"<{task},",
block_task__contains=task,
) or await cls.exists(
group_id=group_id,
channel_id__isnull=True,
superuser_block_task__contains=f"<{task},",
superuser_block_task__contains=task,
)
return await cls.exists(
group_id=group_id, channel_id=channel_id, block_task__contains=f"<{task},"
group_id=group_id, channel_id=channel_id, block_task__contains=task
) or await cls.exists(
group_id=group_id,
channel_id__isnull=True,
superuser_block_task__contains=f"<{task},",
superuser_block_task__contains=task,
)
@classmethod
@ -356,12 +427,20 @@ class GroupConsole(Model):
group, _ = await cls.get_or_create(
group_id=group_id, defaults={"platform": platform}
)
update_fields = []
if is_superuser:
if f"<{task}," not in group.superuser_block_task:
group.superuser_block_task += f"<{task},"
elif f"<{task}," not in group.block_task:
group.block_task += f"<{task},"
await group.save(update_fields=["block_task", "superuser_block_task"])
superuser_block_task = convert_module_format(group.superuser_block_task)
if task not in group.superuser_block_task:
superuser_block_task.append(task)
group.superuser_block_task = convert_module_format(superuser_block_task)
update_fields.append("superuser_block_task")
elif add_disable_marker(task) not in group.block_task:
block_task = convert_module_format(group.block_task)
block_task.append(task)
group.block_task = convert_module_format(block_task)
update_fields.append("block_task")
if update_fields:
await group.save(update_fields=update_fields)
@classmethod
async def set_unblock_task(
@ -382,14 +461,20 @@ class GroupConsole(Model):
group, _ = await cls.get_or_create(
group_id=group_id, defaults={"platform": platform}
)
update_fields = []
if is_superuser:
if f"<{task}," in group.superuser_block_task:
group.superuser_block_task = group.superuser_block_task.replace(
f"<{task},", ""
)
elif f"<{task}," in group.block_task:
group.block_task = group.block_task.replace(f"<{task},", "")
await group.save(update_fields=["block_task", "superuser_block_task"])
superuser_block_task = convert_module_format(group.superuser_block_task)
if task in superuser_block_task:
superuser_block_task.remove(task)
group.superuser_block_task = convert_module_format(superuser_block_task)
update_fields.append("superuser_block_task")
elif add_disable_marker(task) in group.block_task:
block_task = convert_module_format(group.block_task)
block_task.remove(task)
group.block_task = convert_module_format(block_task)
update_fields.append("block_task")
if update_fields:
await group.save(update_fields=update_fields)
@classmethod
def _run_script(cls):

View File

@ -60,27 +60,41 @@ class PluginInfo(Model):
table_description = "插件基本信息"
@classmethod
async def get_plugin(cls, load_status: bool = True, **kwargs) -> Self | None:
async def get_plugin(
cls, load_status: bool = True, filter_parent: bool = True, **kwargs
) -> Self | None:
"""获取插件列表
参数:
load_status: 加载状态.
filter_parent: 过滤父组件
返回:
Self | None: 插件
"""
if filter_parent:
return await cls.get_or_none(
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
)
return await cls.get_or_none(load_status=load_status, **kwargs)
@classmethod
async def get_plugins(cls, load_status: bool = True, **kwargs) -> list[Self]:
async def get_plugins(
cls, load_status: bool = True, filter_parent: bool = True, **kwargs
) -> list[Self]:
"""获取插件列表
参数:
load_status: 加载状态.
filter_parent: 过滤父组件
返回:
list[Self]: 插件列表
"""
if filter_parent:
return await cls.filter(
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
).all()
return await cls.filter(load_status=load_status, **kwargs).all()
@classmethod

View File

@ -64,8 +64,6 @@ class MessageUtils:
if isinstance(msg, str):
if msg.startswith("base64://"):
message_list.append(Image(raw=BytesIO(base64.b64decode(msg[9:]))))
elif msg.startswith("http://"):
message_list.append(Image(url=msg))
else:
message_list.append(Text(msg))
elif isinstance(msg, int | float):