import copy import re from typing import Any from nonebot.adapters import Message, MessageSegment def _truncate_base64_string(value: str, threshold: int = 256) -> str: """如果字符串是超长的base64或data URI,则截断它。""" if not isinstance(value, str): return value prefixes = ("base64://", "data:image", "data:video", "data:audio") if value.startswith(prefixes) and len(value) > threshold: prefix = next((p for p in prefixes if value.startswith(p)), "base64") return f"[{prefix}_data_omitted_len={len(value)}]" if len(value) > 1000: return f"[long_string_omitted_len={len(value)}] {value[:20]}...{value[-20:]}" if len(value) > 2000: return f"[long_string_omitted_len={len(value)}] {value[:50]}...{value[-20:]}" return value def _truncate_vector_list(vector: list, threshold: int = 10) -> list: """如果列表过长(通常是embedding向量),则截断它用于日志显示。""" if isinstance(vector, list) and len(vector) > threshold: return [*vector[:3], f"...({len(vector)} floats omitted)...", *vector[-3:]] return vector def _recursive_sanitize_any(obj: Any) -> Any: """递归清洗任何对象中的长字符串""" if isinstance(obj, dict): return {k: _recursive_sanitize_any(v) for k, v in obj.items()} elif isinstance(obj, list): return [_recursive_sanitize_any(v) for v in obj] elif isinstance(obj, str): return _truncate_base64_string(obj) return obj def _sanitize_ui_html(html_string: str) -> str: """ 专门用于净化UI渲染调试HTML的函数。 它会查找所有内联的base64数据(如字体、图片)并将其截断。 """ if not isinstance(html_string, str): return html_string pattern = re.compile(r"(data:[^;]+;base64,)[A-Za-z0-9+/=\s]{100,}") def replacer(match): prefix = match.group(1) original_len = len(match.group(0)) - len(prefix) return f"{prefix}[...base64_omitted_len={original_len}...]" return pattern.sub(replacer, html_string) def _sanitize_nonebot_message(message: Message) -> Message: """净化nonebot.adapter.Message对象,用于日志记录。""" sanitized_message = copy.deepcopy(message) for seg in sanitized_message: seg: MessageSegment if seg.type in ("image", "record", "video"): file_info = seg.data.get("file", "") if isinstance(file_info, str): seg.data["file"] = _truncate_base64_string(file_info) return sanitized_message def _sanitize_openai_response(response_json: dict) -> dict: """净化OpenAI兼容API的响应体。""" try: sanitized_json = copy.deepcopy(response_json) if "choices" in sanitized_json and isinstance(sanitized_json["choices"], list): for choice in sanitized_json["choices"]: if "message" in choice and isinstance(choice["message"], dict): message = choice["message"] if "images" in message and isinstance(message["images"], list): for i, image_info in enumerate(message["images"]): if "image_url" in image_info and isinstance( image_info["image_url"], dict ): url = image_info["image_url"].get("url", "") message["images"][i]["image_url"]["url"] = ( _truncate_base64_string(url) ) if "reasoning_details" in message and isinstance( message["reasoning_details"], list ): for detail in message["reasoning_details"]: if isinstance(detail, dict): if "data" in detail and isinstance(detail["data"], str): if len(detail["data"]) > 100: detail["data"] = ( f"[encrypted_data_omitted_len={len(detail['data'])}]" ) if "text" in detail and isinstance(detail["text"], str): detail["text"] = _truncate_base64_string( detail["text"], threshold=2000 ) if "data" in sanitized_json and isinstance(sanitized_json["data"], list): for item in sanitized_json["data"]: if "embedding" in item and isinstance(item["embedding"], list): item["embedding"] = _truncate_vector_list(item["embedding"]) if "b64_json" in item and isinstance(item["b64_json"], str): if len(item["b64_json"]) > 256: item["b64_json"] = ( f"[base64_json_omitted_len={len(item['b64_json'])}]" ) if "input" in sanitized_json and isinstance(sanitized_json["input"], list): for item in sanitized_json["input"]: if "content" in item and isinstance(item["content"], list): for part in item["content"]: if isinstance(part, dict) and part.get("type") == "input_image": image_url = part.get("image_url") if isinstance(image_url, str): part["image_url"] = _truncate_base64_string(image_url) return sanitized_json except Exception: return response_json def _sanitize_openai_request(body: dict) -> dict: """净化OpenAI兼容API的请求体,主要截断图片base64。""" from zhenxun.services.llm.config.providers import ( DebugLogOptions, get_llm_config, ) debug_conf = get_llm_config().debug_log if isinstance(debug_conf, bool): debug_conf = DebugLogOptions( show_tools=debug_conf, show_schema=debug_conf, show_safety=debug_conf ) try: sanitized_json = _recursive_sanitize_any(copy.deepcopy(body)) if "tools" in sanitized_json and not debug_conf.show_tools: tools = sanitized_json["tools"] if isinstance(tools, list): tool_names = [] for t in tools: if isinstance(t, dict): name = None if "function" in t and isinstance(t["function"], dict): name = t["function"].get("name") if not name and "name" in t: name = t.get("name") tool_names.append(name or "unknown") sanitized_json["tools"] = ( f"<{len(tool_names)} tools hidden: {', '.join(tool_names)}>" ) if "response_format" in sanitized_json and not debug_conf.show_schema: response_format = sanitized_json["response_format"] if isinstance(response_format, dict): if response_format.get("type") == "json_schema": sanitized_json["response_format"] = { "type": "json_schema", "json_schema": "