mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-14 21:52:56 +08:00
- 【LLM服务】 - `LLMResponse` 模型现在支持 `images: list[bytes]`,允许模型返回多张图片。 - LLM适配器 (`base.py`, `gemini.py`) 和 API 层 (`api.py`, `service.py`) 已更新以处理多图片响应。 - 响应验证逻辑已调整,以检查 `images` 列表而非单个 `image_bytes`。 - 【UI渲染服务】 - 引入组件“皮肤”(variant)概念,允许为同一组件提供不同视觉风格。 - 改进了 `manifest.json` 的加载、合并和缓存机制,支持基础清单与皮肤清单的递归合并。 - `ThemeManager` 现在会缓存已加载的清单,并在主题重载时清除缓存。 - 增强了资源解析器 (`ResourceResolver`),支持 `@` 命名空间路径和更健壮的相对路径处理。 - 独立模板现在会继承主 Jinja 环境的过滤器。 - 【工具函数】 - 引入 `dump_json_safely` 工具函数,用于更安全地序列化包含 Pydantic 模型、枚举等复杂类型的对象为 JSON。 - LLM 服务中的请求体和缓存键生成已改用 `dump_json_safely`。 - 优化了 `format_usage_for_markdown` 函数,改进了 Markdown 文本的格式化,确保块级元素前有正确换行,并正确处理段落内硬换行。 Co-authored-by: webjoin111 <455457521@qq.com>
141 lines
4.8 KiB
Python
141 lines
4.8 KiB
Python
import re
|
||
from typing import overload
|
||
|
||
from nonebot.adapters import Bot
|
||
from nonebot_plugin_uninfo import Session, SupportScope, Uninfo, get_interface
|
||
|
||
from zhenxun.configs.config import BotConfig
|
||
from zhenxun.models.ban_console import BanConsole
|
||
from zhenxun.models.bot_console import BotConsole
|
||
from zhenxun.models.group_console import GroupConsole
|
||
from zhenxun.models.task_info import TaskInfo
|
||
from zhenxun.services.log import logger
|
||
|
||
|
||
class CommonUtils:
|
||
@classmethod
|
||
async def task_is_block(
|
||
cls, session: Uninfo | Bot, module: str, group_id: str | None = None
|
||
) -> bool:
|
||
"""判断被动技能是否可以发送
|
||
|
||
参数:
|
||
module: 被动技能模块名
|
||
group_id: 群组id
|
||
|
||
返回:
|
||
bool: 是否可以发送
|
||
"""
|
||
if isinstance(session, Bot):
|
||
if interface := get_interface(session):
|
||
info = interface.basic_info()
|
||
if info["scope"] == SupportScope.qq_api:
|
||
logger.info("q官bot放弃所有被动技能发言...")
|
||
"""q官bot放弃所有被动技能发言"""
|
||
return False
|
||
if session.scene == SupportScope.qq_api:
|
||
"""q官bot放弃所有被动技能发言"""
|
||
logger.info("q官bot放弃所有被动技能发言...")
|
||
return False
|
||
if not group_id and isinstance(session, Session):
|
||
group_id = session.group.id if session.group else None
|
||
if task := await TaskInfo.get_or_none(module=module):
|
||
"""被动全局状态"""
|
||
if not task.status:
|
||
return True
|
||
if not await BotConsole.get_bot_status(session.self_id):
|
||
"""bot是否休眠"""
|
||
return True
|
||
block_tasks = await BotConsole.get_tasks(session.self_id, False)
|
||
if module in block_tasks:
|
||
"""bot是否禁用被动"""
|
||
return True
|
||
if group_id:
|
||
if await GroupConsole.is_block_task(group_id, module):
|
||
"""群组是否禁用被动"""
|
||
return True
|
||
if g := await GroupConsole.get_group(group_id=group_id):
|
||
"""群组权限是否小于0"""
|
||
if g.level < 0:
|
||
return True
|
||
if await BanConsole.is_ban(None, group_id):
|
||
"""群组是否被ban"""
|
||
return True
|
||
return False
|
||
|
||
@staticmethod
|
||
def format(name: str) -> str:
|
||
return f"<{name},"
|
||
|
||
@overload
|
||
@classmethod
|
||
def convert_module_format(cls, data: str) -> list[str]: ...
|
||
|
||
@overload
|
||
@classmethod
|
||
def convert_module_format(cls, data: list[str]) -> str: ...
|
||
|
||
@classmethod
|
||
def convert_module_format(cls, data: str | list[str]) -> str | list[str]:
|
||
"""
|
||
在 `<aaa,<bbb,<ccc,` 和 `["aaa", "bbb", "ccc"]` 之间进行相互转换。
|
||
|
||
参数:
|
||
data (str | list[str]): 输入数据,可能是格式化字符串或字符串列表。
|
||
|
||
返回:
|
||
str | list[str]: 根据输入类型返回转换后的数据。
|
||
"""
|
||
if isinstance(data, str):
|
||
return [item.strip(",") for item in data.split("<") if item]
|
||
elif isinstance(data, list):
|
||
return "".join(cls.format(item) for item in data)
|
||
|
||
|
||
class SqlUtils:
|
||
@classmethod
|
||
def random(cls, query, limit: int = 1) -> str:
|
||
db_class_name = BotConfig.get_sql_type()
|
||
if "postgres" in db_class_name or "sqlite" in db_class_name:
|
||
query = f"{query.sql()} ORDER BY RANDOM() LIMIT {limit};"
|
||
elif "mysql" in db_class_name:
|
||
query = f"{query.sql()} ORDER BY RAND() LIMIT {limit};"
|
||
else:
|
||
logger.warning(
|
||
f"Unsupported database type: {db_class_name}", query.__module__
|
||
)
|
||
return query
|
||
|
||
@classmethod
|
||
def add_column(
|
||
cls,
|
||
table_name: str,
|
||
column_name: str,
|
||
column_type: str,
|
||
default: str | None = None,
|
||
not_null: bool = False,
|
||
) -> str:
|
||
sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}"
|
||
if default:
|
||
sql += f" DEFAULT {default}"
|
||
if not_null:
|
||
sql += " NOT NULL"
|
||
return sql
|
||
|
||
|
||
def format_usage_for_markdown(text: str) -> str:
|
||
"""
|
||
智能地将Python多行字符串转换为适合Markdown渲染的格式。
|
||
- 在列表、标题等块级元素前自动插入换行,确保正确解析。
|
||
- 将段落内的单个换行符替换为Markdown的硬换行(行尾加两个空格)。
|
||
- 保留两个或更多的连续换行符,使其成为Markdown的段落分隔。
|
||
"""
|
||
if not text:
|
||
return ""
|
||
|
||
text = re.sub(r"([^\n])\n(\s*[-*] |\s*#+\s|\s*>)", r"\1\n\n\2", text)
|
||
|
||
text = re.sub(r"(?<!\n)\n(?!\n)", " \n", text)
|
||
|
||
return text
|