zhenxun_bot/zhenxun/services/llm/memory.py
Rumio c9f0a8b9d9
Some checks failed
检查bot是否运行正常 / bot check (push) Has been cancelled
CodeQL Code Security Analysis / Analyze (${{ matrix.language }}) (none, python) (push) Has been cancelled
Sequential Lint and Type Check / ruff-call (push) Has been cancelled
Release Drafter / Update Release Draft (push) Has been cancelled
Force Sync to Aliyun / sync (push) Has been cancelled
Update Version / update-version (push) Has been cancelled
Sequential Lint and Type Check / pyright-call (push) Has been cancelled
♻️ refactor(llm): 重构 LLM 服务架构,引入中间件与组件化适配器 (#2073)
* ♻️ refactor(llm): 重构 LLM 服务架构,引入中间件与组件化适配器

- 【重构】LLM 服务核心架构:
    - 引入中间件管道,统一处理请求生命周期(重试、密钥选择、日志、网络请求)。
    - 适配器重构为组件化设计,分离配置映射、消息转换、响应解析和工具序列化逻辑。
    - 移除 `with_smart_retry` 装饰器,其功能由中间件接管。
    - 移除 `LLMToolExecutor`,工具执行逻辑集成到 `ToolInvoker`。
- 【功能】增强配置系统:
    - `LLMGenerationConfig` 采用组件化结构(Core, Reasoning, Visual, Output, Safety, ToolConfig)。
    - 新增 `GenConfigBuilder` 提供语义化配置构建方式。
    - 新增 `LLMEmbeddingConfig` 用于嵌入专用配置。
    - `CommonOverrides` 迁移并更新至新配置结构。
- 【功能】强化工具系统:
    - 引入 `ToolInvoker` 实现更灵活的工具执行,支持回调与结构化错误。
    - `function_tool` 装饰器支持动态 Pydantic 模型创建和依赖注入 (`ToolParam`, `RunContext`)。
    - 平台原生工具支持 (`GeminiCodeExecution`, `GeminiGoogleSearch`, `GeminiUrlContext`)。
- 【功能】高级生成与嵌入:
    - `generate_structured` 方法支持 In-Context Validation and Repair (IVR) 循环和 AutoCoT (思维链) 包装。
    - 新增 `embed_query` 和 `embed_documents` 便捷嵌入 API。
    - `OpenAIImageAdapter` 支持 OpenAI 兼容的图像生成。
    - `SmartAdapter` 实现模型名称智能路由。
- 【重构】消息与类型系统:
    - `LLMContentPart` 扩展支持更多模态和代码执行相关内容。
    - `LLMMessage` 和 `LLMResponse` 结构更新,支持 `content_parts` 和思维链签名。
    - 统一 `LLMErrorCode` 和用户友好错误消息,提供更详细的网络/代理错误提示。
    - `pyproject.toml` 移除 `bilireq`,新增 `json_repair`。
- 【优化】日志与调试:
    - 引入 `DebugLogOptions`,提供细粒度日志脱敏控制。
    - 增强日志净化器,处理更多敏感数据和长字符串。
- 【清理】删除废弃模块:
    - `zhenxun/services/llm/memory.py`
    - `zhenxun/services/llm/executor.py`
    - `zhenxun/services/llm/config/presets.py`
    - `zhenxun/services/llm/types/content.py`
    - `zhenxun/services/llm/types/enums.py`
    - `zhenxun/services/llm/tools/__init__.py`
    - `zhenxun/services/llm/tools/manager.py`

* 📦️ build(deps): 移除 bilireq 并添加 json_repair 依赖

* 🐛 (llm): 移除图片生成模型能力预检查

* ♻️ refactor(llm.session): 重构记忆系统以分离存储和策略

* 🐛 fix(reload_setting): 重载配置时清除LLM缓存

*  feat(llm): 支持结构化生成函数接收 UniMessage

*  feat(search): 为搜索功能默认启用 Gemini Google Search 工具

* 🚨 auto fix by pre-commit hooks

---------

Co-authored-by: webjoin111 <455457521@qq.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2025-12-14 20:27:02 +08:00

244 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LLM 服务 - 会话记忆模块
定义了LLM会话记忆的存储、策略和处理接口。
"""
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Callable
from typing import Any
from pydantic import BaseModel, Field
from zhenxun.services.llm.types import LLMMessage
from zhenxun.services.log import logger
class AIConfig(BaseModel):
"""AI配置类 (为保持独立性而在此处保留一个副本,实际使用中可能来自更高层)"""
model: Any = None
default_embedding_model: Any = None
default_preserve_media_in_history: bool = False
tool_providers: list[Any] = Field(default_factory=list)
def __post_init__(self):
"""初始化后从配置中读取默认值"""
pass
class BaseMessageStore(ABC):
"""
底层存储接口 (DAO - Data Access Object)。
这是一个抽象基类,定义了消息数据最底层的 **持久化与检索 (CRUD)** 接口。
它只关心数据的存取,不涉及任何业务逻辑(如历史记录修剪)。
开发者如果希望将对话历史存储到 Redis、数据库或其他持久化后端
应当实现这个接口。
"""
@abstractmethod
async def get_messages(self, session_id: str) -> list[LLMMessage]:
"""
根据会话ID获取完整的消息列表。
"""
raise NotImplementedError
@abstractmethod
async def add_messages(self, session_id: str, messages: list[LLMMessage]) -> None:
"""追加消息"""
raise NotImplementedError
@abstractmethod
async def set_messages(self, session_id: str, messages: list[LLMMessage]) -> None:
"""
完全覆盖指定会话ID的消息列表。
主要用于历史记录修剪等场景。
"""
raise NotImplementedError
@abstractmethod
async def clear(self, session_id: str) -> None:
"""清空指定会话ID的所有消息数据。"""
raise NotImplementedError
class InMemoryMessageStore(BaseMessageStore):
"""
一个基于内存的 `BaseMessageStore` 实现。
它使用一个Python字典来存储所有会话的消息提供了最简单、最快速的存储方案。
这是框架的默认存储方式,实现了开箱即用。
注意:此实现是 **非持久化** 的,当应用程序重启时,所有对话历史都会丢失。
适用于测试、简单应用或不需要长期记忆的场景。
"""
def __init__(self):
self._data: dict[str, list[LLMMessage]] = defaultdict(list)
async def get_messages(self, session_id: str) -> list[LLMMessage]:
"""从内存字典中获取消息列表的副本。"""
return self._data.get(session_id, []).copy()
async def add_messages(self, session_id: str, messages: list[LLMMessage]) -> None:
"""向内存中的消息列表追加消息。"""
self._data[session_id].extend(messages)
async def set_messages(self, session_id: str, messages: list[LLMMessage]) -> None:
"""在内存中直接替换指定会话的消息列表。"""
self._data[session_id] = messages
async def clear(self, session_id: str) -> None:
"""从内存字典中删除指定会话的条目。"""
if session_id in self._data:
del self._data[session_id]
class BaseMemory(ABC):
"""
记忆系统上层逻辑基类 (Strategy Layer)。
此抽象基类定义了记忆系统的 **策略层** 接口。它负责对外提供统一的记忆操作
接口,并封装了具体的记忆管理策略,如历史记录的修剪、摘要生成等。
`AI` 会话客户端直接与此接口交互,而不关心底层的存储实现。
开发者可以通过实现此接口来创建自定义的记忆管理策略,例如:
- `SummarizationMemory`: 在历史记录过长时自动调用LLM生成摘要来压缩历史。
- `VectorStoreMemory`: 将对话历史向量化并存入向量数据库,实现长期记忆检索。
"""
@abstractmethod
async def get_history(self, session_id: str) -> list[LLMMessage]:
"""获取用于构建模型输入的完整历史消息列表。"""
raise NotImplementedError
async def add_message(self, session_id: str, message: LLMMessage) -> None:
"""向记忆中添加单条消息。默认实现是调用 `add_messages`。"""
await self.add_messages(session_id, [message])
@abstractmethod
async def add_messages(self, session_id: str, messages: list[LLMMessage]) -> None:
"""向记忆中添加多条消息,并可能触发内部的记忆管理策略(如修剪)。"""
raise NotImplementedError
@abstractmethod
async def clear_history(self, session_id: str) -> None:
"""清空指定会话的全部记忆。"""
raise NotImplementedError
class ChatMemory(BaseMemory):
"""
标准聊天记忆实现:组合 Store + 滑动窗口策略。
这是 `BaseMemory` 的默认实现,它通过组合一个 `BaseMessageStore` 实例来
完成实际的数据存储,并在此之上实现了一个简单的“滑动窗口”记忆修剪策略。
"""
def __init__(self, store: BaseMessageStore, max_messages: int = 50):
self.store = store
self._max_messages = max_messages
async def _trim_history(self, session_id: str) -> None:
"""
记忆修剪策略:确保历史记录不超过 `_max_messages` 条。
如果存在系统消息 (System Prompt),它将被永久保留在列表的第一位。
"""
history = await self.store.get_messages(session_id)
if len(history) <= self._max_messages:
return
has_system = history and history[0].role == "system"
new_history: list[LLMMessage] = []
if has_system:
keep_count = max(0, self._max_messages - 1)
new_history = [history[0], *history[-keep_count:]]
else:
new_history = history[-self._max_messages :]
await self.store.set_messages(session_id, new_history)
async def get_history(self, session_id: str) -> list[LLMMessage]:
"""直接从底层存储获取历史记录。"""
return await self.store.get_messages(session_id)
async def add_messages(self, session_id: str, messages: list[LLMMessage]) -> None:
"""添加消息到历史记录,并立即执行修剪策略。"""
await self.store.add_messages(session_id, messages)
await self._trim_history(session_id)
async def clear_history(self, session_id: str) -> None:
"""清空底层存储中的历史记录。"""
await self.store.clear(session_id)
class MemoryProcessor(ABC):
"""
记忆处理器接口 (Hook/Observer)。
这是一个扩展接口,允许开发者创建自定义的“记忆处理器”,以在记忆被修改后
执行额外的操作(“钩子”)。
当 `AI` 实例的记忆更新时,它会依次调用所有注册的 `MemoryProcessor`。
使用场景示例:
- `LoggingMemoryProcessor`: 将每一轮对话异步记录到外部日志系统。
- `SummarizationProcessor`: 在后台任务中检查对话长度,并在需要时生成摘要。
- `EntityExtractionProcessor`: 从对话中提取关键实体(如人名、地名)并存储。
"""
@abstractmethod
async def process(self, session_id: str, new_messages: list[LLMMessage]) -> None:
"""处理新添加到记忆中的消息。"""
pass
_default_memory_factory: Callable[[], BaseMemory] | None = None
def set_default_memory_backend(factory: Callable[[], BaseMemory]):
"""
设置全局默认记忆后端工厂,允许统一替换会话的记忆实现。
这是一个高级依赖注入函数,允许插件或项目在启动时用自定义的 `BaseMemory`
实现替换掉默认的 `ChatMemory(InMemoryMessageStore())`。
Args:
factory: 一个无参数的、返回 `BaseMemory` 实例的函数或类。
"""
global _default_memory_factory
_default_memory_factory = factory
def _get_default_memory() -> BaseMemory:
"""
[内部函数] 获取一个默认的记忆后端实例。
它会首先检查是否有通过 `set_default_memory_backend` 设置的全局工厂,
如果有,则使用该工厂创建实例;否则,返回一个标准的内存记忆实例。
"""
if _default_memory_factory:
logger.debug("使用自定义的默认记忆后端工厂构建实例。")
return _default_memory_factory()
logger.debug("未配置自定义记忆后端,使用默认的 ChatMemory。")
return ChatMemory(store=InMemoryMessageStore())
__all__ = [
"AIConfig",
"BaseMemory",
"BaseMessageStore",
"ChatMemory",
"InMemoryMessageStore",
"MemoryProcessor",
"_get_default_memory",
"set_default_memory_backend",
]