zhenxun_bot/zhenxun/utils/common_utils.py
Rumio 74a9f3a843
feat(core): 支持LLM多图片响应,增强UI主题皮肤系统及优化JSON/Markdown处理 (#2062)
- 【LLM服务】
  - `LLMResponse` 模型现在支持 `images: list[bytes]`,允许模型返回多张图片。
  - LLM适配器 (`base.py`, `gemini.py`) 和 API 层 (`api.py`, `service.py`) 已更新以处理多图片响应。
  - 响应验证逻辑已调整,以检查 `images` 列表而非单个 `image_bytes`。
- 【UI渲染服务】
  - 引入组件“皮肤”(variant)概念,允许为同一组件提供不同视觉风格。
  - 改进了 `manifest.json` 的加载、合并和缓存机制,支持基础清单与皮肤清单的递归合并。
  - `ThemeManager` 现在会缓存已加载的清单,并在主题重载时清除缓存。
  - 增强了资源解析器 (`ResourceResolver`),支持 `@` 命名空间路径和更健壮的相对路径处理。
  - 独立模板现在会继承主 Jinja 环境的过滤器。
- 【工具函数】
  - 引入 `dump_json_safely` 工具函数,用于更安全地序列化包含 Pydantic 模型、枚举等复杂类型的对象为 JSON。
  - LLM 服务中的请求体和缓存键生成已改用 `dump_json_safely`。
  - 优化了 `format_usage_for_markdown` 函数,改进了 Markdown 文本的格式化,确保块级元素前有正确换行,并正确处理段落内硬换行。

Co-authored-by: webjoin111 <455457521@qq.com>
2025-10-09 08:50:40 +08:00

141 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from typing import overload
from nonebot.adapters import Bot
from nonebot_plugin_uninfo import Session, SupportScope, Uninfo, get_interface
from zhenxun.configs.config import BotConfig
from zhenxun.models.ban_console import BanConsole
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
class CommonUtils:
@classmethod
async def task_is_block(
cls, session: Uninfo | Bot, module: str, group_id: str | None = None
) -> bool:
"""判断被动技能是否可以发送
参数:
module: 被动技能模块名
group_id: 群组id
返回:
bool: 是否可以发送
"""
if isinstance(session, Bot):
if interface := get_interface(session):
info = interface.basic_info()
if info["scope"] == SupportScope.qq_api:
logger.info("q官bot放弃所有被动技能发言...")
"""q官bot放弃所有被动技能发言"""
return False
if session.scene == SupportScope.qq_api:
"""q官bot放弃所有被动技能发言"""
logger.info("q官bot放弃所有被动技能发言...")
return False
if not group_id and isinstance(session, Session):
group_id = session.group.id if session.group else None
if task := await TaskInfo.get_or_none(module=module):
"""被动全局状态"""
if not task.status:
return True
if not await BotConsole.get_bot_status(session.self_id):
"""bot是否休眠"""
return True
block_tasks = await BotConsole.get_tasks(session.self_id, False)
if module in block_tasks:
"""bot是否禁用被动"""
return True
if group_id:
if await GroupConsole.is_block_task(group_id, module):
"""群组是否禁用被动"""
return True
if g := await GroupConsole.get_group(group_id=group_id):
"""群组权限是否小于0"""
if g.level < 0:
return True
if await BanConsole.is_ban(None, group_id):
"""群组是否被ban"""
return True
return False
@staticmethod
def format(name: str) -> str:
return f"<{name},"
@overload
@classmethod
def convert_module_format(cls, data: str) -> list[str]: ...
@overload
@classmethod
def convert_module_format(cls, data: list[str]) -> str: ...
@classmethod
def convert_module_format(cls, data: str | list[str]) -> str | list[str]:
"""
在 `<aaa,<bbb,<ccc,` 和 `["aaa", "bbb", "ccc"]` 之间进行相互转换。
参数:
data (str | list[str]): 输入数据,可能是格式化字符串或字符串列表。
返回:
str | list[str]: 根据输入类型返回转换后的数据。
"""
if isinstance(data, str):
return [item.strip(",") for item in data.split("<") if item]
elif isinstance(data, list):
return "".join(cls.format(item) for item in data)
class SqlUtils:
@classmethod
def random(cls, query, limit: int = 1) -> str:
db_class_name = BotConfig.get_sql_type()
if "postgres" in db_class_name or "sqlite" in db_class_name:
query = f"{query.sql()} ORDER BY RANDOM() LIMIT {limit};"
elif "mysql" in db_class_name:
query = f"{query.sql()} ORDER BY RAND() LIMIT {limit};"
else:
logger.warning(
f"Unsupported database type: {db_class_name}", query.__module__
)
return query
@classmethod
def add_column(
cls,
table_name: str,
column_name: str,
column_type: str,
default: str | None = None,
not_null: bool = False,
) -> str:
sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}"
if default:
sql += f" DEFAULT {default}"
if not_null:
sql += " NOT NULL"
return sql
def format_usage_for_markdown(text: str) -> str:
"""
智能地将Python多行字符串转换为适合Markdown渲染的格式。
- 在列表、标题等块级元素前自动插入换行,确保正确解析。
- 将段落内的单个换行符替换为Markdown的硬换行行尾加两个空格
- 保留两个或更多的连续换行符使其成为Markdown的段落分隔。
"""
if not text:
return ""
text = re.sub(r"([^\n])\n(\s*[-*] |\s*#+\s|\s*>)", r"\1\n\n\2", text)
text = re.sub(r"(?<!\n)\n(?!\n)", " \n", text)
return text