From b9346b0d59992b43aeb597b494d1e73c8c1c8a85 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Wed, 8 Apr 2026 00:52:59 +0800 Subject: [PATCH] feat: generalize multimodal support with audio/video handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add comprehensive audio and video support across the agent pipeline: - Generalize media placeholder system: _strip_image_content → _strip_media_content, _media_placeholder with type-specific labels, unified across providers - Add detect_audio_mime with magic-byte detection and filename fallback - Add _AUDIO_FORMAT_MAP for correct MIME-to-API-format conversion - Add InputLimitsConfig with count limits (max_input_audios/videos) and byte limits - Support input_audio blocks in context builder with OpenAI-compatible format - Support video_url blocks with base64 inline data - Add audio/video passthrough in Codex provider, placeholder fallback in Anthropic provider - Thread supports_vision/audio/video capability flags through AgentLoop - Unify placeholder format: [audio: path]/[video: path] instead of generic [file: path] - Optimize file I/O: single read_bytes() instead of header+full double reads - Extract _STRIP_MEDIA_TYPES as class constant to avoid per-call allocation --- nanobot/agent/context.py | 67 +-- nanobot/agent/loop.py | 39 +- nanobot/cli/commands.py | 9 + nanobot/config/schema.py | 6 +- nanobot/nanobot.py | 3 + nanobot/providers/anthropic_provider.py | 10 +- nanobot/providers/base.py | 33 +- nanobot/providers/openai_codex_provider.py | 101 +++++ nanobot/utils/helpers.py | 63 ++- tests/config/test_config_migration.py | 3 +- tests/providers/test_provider_retry.py | 4 +- tests/test_context_multimodal.py | 24 +- tests/test_multimodal_capabilities.py | 497 +++++++++++++++++++++ 13 files changed, 786 insertions(+), 73 deletions(-) create mode 100644 tests/test_multimodal_capabilities.py diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index fe1d0681a..4d17168ba 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -7,16 +7,17 @@ from pathlib import Path from typing import Any from nanobot.agent.memory import MemoryStore -from nanobot.utils.prompt_templates import render_template from nanobot.agent.skills import SkillsLoader from nanobot.config.schema import InputLimitsConfig from nanobot.utils.helpers import ( + audio_format_for_api, audio_mime_compat, build_assistant_message, current_time_str, detect_audio_mime, detect_image_mime, ) +from nanobot.utils.prompt_templates import render_template class ContextBuilder: @@ -195,85 +196,89 @@ class ContextBuilder: image_count += 1 if image_count <= max_images: image_media.append(path) - elif image_count == max_images + 1: - notes.append( - f"[Skipped {len(media) - max_images} images: " - f"only the first {max_images} images are included]" - ) else: non_image_media.append(path) + if image_count > max_images: + extra = image_count - max_images + noun = "image" if extra == 1 else "images" + notes.append( + f"[Skipped {extra} {noun}: " + f"only the first {max_images} images are included]" + ) + # Process images for path in image_media: p = Path(path) try: - with p.open("rb") as f: - header = f.read(32) + raw = p.read_bytes() except OSError: notes.append(f"[Skipped image: unable to read ({p.name or path})]") continue - try: - size = p.stat().st_size - except OSError: - notes.append(f"[Skipped image: unable to read ({p.name or path})]") - continue - if size > limits.max_input_image_bytes: + if len(raw) > limits.max_input_image_bytes: size_mb = limits.max_input_image_bytes // (1024 * 1024) notes.append(f"[Skipped image: file too large ({p.name}, limit {size_mb} MB)]") continue - img_mime = detect_image_mime(header) or mimetypes.guess_type(path)[0] + img_mime = detect_image_mime(raw[:32]) or mimetypes.guess_type(path)[0] if not img_mime or not img_mime.startswith("image/"): notes.append(f"[Skipped image: unsupported or invalid image format ({p.name})]") continue - blocks.append(self._encode_image_block(p.read_bytes(), img_mime, p)) + blocks.append(self._encode_image_block(raw, img_mime, p)) # Process non-image media (audio, video, unknown) + audio_count = 0 + video_count = 0 for path in non_image_media: p = Path(path) guessed_mime = mimetypes.guess_type(path)[0] or "" is_audio = guessed_mime.startswith("audio/") try: - with p.open("rb") as f: - header = f.read(32) + raw = p.read_bytes() except OSError: continue # Audio detection: by magic bytes or by filename # Always pass filename so fallback can match when magic bytes fail - audio_mime = detect_audio_mime(header, filename=path) + audio_mime = detect_audio_mime(raw[:32], filename=path) if audio_mime or is_audio: if supports_audio is True and audio_mime_compat(audio_mime): - try: - size = p.stat().st_size - except OSError: + audio_count += 1 + if audio_count > limits.max_input_audios: + if audio_count == limits.max_input_audios + 1: + notes.append( + f"[Skipped audio: only {limits.max_input_audios} audio file(s) allowed]" + ) continue - if size > limits.max_input_audio_bytes: + if len(raw) > limits.max_input_audio_bytes: size_mb = limits.max_input_audio_bytes // (1024 * 1024) notes.append(f"[Skipped audio: file too large ({p.name}, limit {size_mb} MB)]") continue - raw = p.read_bytes() b64 = base64.b64encode(raw).decode() blocks.append({ "type": "input_audio", - "input_audio": {"data": b64, "format": audio_mime.split("/")[-1]}, + "input_audio": {"data": b64, "format": audio_format_for_api(audio_mime)}, "_meta": {"path": str(p)}, }) + else: + blocks.append({"type": "text", "text": f"[audio: {p}]"}) continue # Video detection: by filename extension is_video = guessed_mime.startswith("video/") if is_video: if supports_video is True: - try: - size = p.stat().st_size - except OSError: + video_count += 1 + if video_count > limits.max_input_videos: + if video_count == limits.max_input_videos + 1: + notes.append( + f"[Skipped video: only {limits.max_input_videos} video file(s) allowed]" + ) continue - if size > limits.max_input_video_bytes: + if len(raw) > limits.max_input_video_bytes: size_mb = limits.max_input_video_bytes // (1024 * 1024) notes.append(f"[Skipped video: file too large ({p.name}, limit {size_mb} MB)]") continue - raw = p.read_bytes() b64 = base64.b64encode(raw).decode() blocks.append({ "type": "video_url", @@ -281,7 +286,7 @@ class ContextBuilder: "_meta": {"path": str(p)}, }) else: - blocks.append({"type": "text", "text": f"[file: {p}]"}) + blocks.append({"type": "text", "text": f"[video: {p}]"}) continue # Unknown -> text placeholder diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 267a75577..1a8e40087 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -15,10 +15,10 @@ from loguru import logger from nanobot.agent.context import ContextBuilder from nanobot.agent.hook import AgentHook, AgentHookContext, CompositeHook from nanobot.agent.memory import Consolidator, Dream -from nanobot.agent.runner import AgentRunSpec, AgentRunner +from nanobot.agent.runner import AgentRunner, AgentRunSpec +from nanobot.agent.skills import BUILTIN_SKILLS_DIR from nanobot.agent.subagent import SubagentManager from nanobot.agent.tools.cron import CronTool -from nanobot.agent.skills import BUILTIN_SKILLS_DIR from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool from nanobot.agent.tools.message import MessageTool from nanobot.agent.tools.registry import ToolRegistry @@ -27,17 +27,21 @@ from nanobot.agent.tools.shell import ExecTool from nanobot.agent.tools.spawn import SpawnTool from nanobot.agent.tools.web import WebFetchTool, WebSearchTool from nanobot.bus.events import InboundMessage, OutboundMessage -from nanobot.command import CommandContext, CommandRouter, register_builtin_commands from nanobot.bus.queue import MessageBus +from nanobot.command import CommandContext, CommandRouter, register_builtin_commands from nanobot.config.schema import AgentDefaults from nanobot.providers.base import LLMProvider from nanobot.session.manager import Session, SessionManager -from nanobot.utils.helpers import image_placeholder_text, truncate_text +from nanobot.utils.helpers import truncate_text from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE if TYPE_CHECKING: -if TYPE_CHECKING: - from nanobot.config.schema import ChannelsConfig, ExecToolConfig, InputLimitsConfig, WebToolsConfig + from nanobot.config.schema import ( + ChannelsConfig, + ExecToolConfig, + InputLimitsConfig, + WebToolsConfig, + ) from nanobot.cron.service import CronService @@ -628,6 +632,8 @@ class AgentLoop: metadata=meta, ) + _MEDIA_PLACEHOLDER_TYPES = {"image_url", "input_audio", "video_url"} + def _sanitize_persisted_blocks( self, content: list[dict[str, Any]], @@ -650,12 +656,21 @@ class AgentLoop: ): continue - if ( - block.get("type") == "image_url" - and block.get("image_url", {}).get("url", "").startswith("data:image/") - ): - path = (block.get("_meta") or {}).get("path", "") - filtered.append({"type": "text", "text": image_placeholder_text(path)}) + btype = block.get("type") + if btype in self._MEDIA_PLACEHOLDER_TYPES: + # Strip blocks that contain volatile inline data. + # - image_url/video_url: strip when url starts with "data:" (base64 inline) + # - input_audio: always strip (data field is always base64 inline) + should_strip = False + if btype == "input_audio": + should_strip = bool(block.get("input_audio", {}).get("data")) + else: + raw_url = (block.get(btype, {}).get("url") or "") + should_strip = raw_url.startswith("data:") + if should_strip: + filtered.append(LLMProvider._media_placeholder(btype, block)) + else: + filtered.append(block) continue if block.get("type") == "text" and isinstance(block.get("text"), str): diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index a71ca6b85..e8f80b061 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -590,6 +590,9 @@ def serve( mcp_servers=runtime_config.tools.mcp_servers, channels_config=runtime_config.channels, timezone=runtime_config.agents.defaults.timezone, + supports_vision=runtime_config.agents.defaults.supports_vision(runtime_config.agents.defaults.model), + supports_audio=runtime_config.agents.defaults.supports_audio(runtime_config.agents.defaults.model), + supports_video=runtime_config.agents.defaults.supports_video(runtime_config.agents.defaults.model), ) model_name = runtime_config.agents.defaults.model @@ -682,6 +685,9 @@ def gateway( mcp_servers=config.tools.mcp_servers, channels_config=config.channels, timezone=config.agents.defaults.timezone, + supports_vision=config.agents.defaults.supports_vision(config.agents.defaults.model), + supports_audio=config.agents.defaults.supports_audio(config.agents.defaults.model), + supports_video=config.agents.defaults.supports_video(config.agents.defaults.model), ) # Set cron callback (needs agent) @@ -914,6 +920,9 @@ def agent( mcp_servers=config.tools.mcp_servers, channels_config=config.channels, timezone=config.agents.defaults.timezone, + supports_vision=config.agents.defaults.supports_vision(config.agents.defaults.model), + supports_audio=config.agents.defaults.supports_audio(config.agents.defaults.model), + supports_video=config.agents.defaults.supports_video(config.agents.defaults.model), ) restart_notice = consume_restart_notice_from_env() if restart_notice and should_show_cli_restart_notice(restart_notice, session_id): diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 732affdb0..0328557b4 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -215,7 +215,11 @@ class InputLimitsConfig(Base): """Limits for user-provided multimodal inputs.""" max_input_images: int = 3 - max_input_image_bytes: int = 10 * 1024 * 1024 + max_input_image_bytes: int = 10 * 1024 * 1024 # 10 MB + max_input_audios: int = 1 + max_input_audio_bytes: int = 10 * 1024 * 1024 # 10 MB + max_input_videos: int = 1 + max_input_video_bytes: int = 20 * 1024 * 1024 # 20 MB class MCPServerConfig(Base): diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py index 85e9e1ddb..fd11cc367 100644 --- a/nanobot/nanobot.py +++ b/nanobot/nanobot.py @@ -81,6 +81,9 @@ class Nanobot: restrict_to_workspace=config.tools.restrict_to_workspace, mcp_servers=config.tools.mcp_servers, timezone=defaults.timezone, + supports_vision=defaults.supports_vision(defaults.model), + supports_audio=defaults.supports_audio(defaults.model), + supports_video=defaults.supports_video(defaults.model), ) return cls(loop) diff --git a/nanobot/providers/anthropic_provider.py b/nanobot/providers/anthropic_provider.py index e389b51ed..97bd5a59f 100644 --- a/nanobot/providers/anthropic_provider.py +++ b/nanobot/providers/anthropic_provider.py @@ -209,7 +209,7 @@ class AnthropicProvider(LLMProvider): return blocks or [{"type": "text", "text": ""}] def _convert_user_content(self, content: Any) -> Any: - """Convert user message content, translating image_url blocks.""" + """Convert user message content, translating image_url and input_audio blocks.""" if isinstance(content, str) or content is None: return content or "(empty)" if not isinstance(content, list): @@ -225,6 +225,14 @@ class AnthropicProvider(LLMProvider): if converted: result.append(converted) continue + if item.get("type") == "input_audio": + # Anthropic doesn't support native audio → text placeholder + result.append(LLMProvider._media_placeholder("input_audio", item)) + continue + if item.get("type") == "video_url": + # Anthropic doesn't support native video → text placeholder + result.append(LLMProvider._media_placeholder("video_url", item)) + continue result.append(item) return result or "(empty)" diff --git a/nanobot/providers/base.py b/nanobot/providers/base.py index d5833c9ae..6ff79e06f 100644 --- a/nanobot/providers/base.py +++ b/nanobot/providers/base.py @@ -12,8 +12,6 @@ from typing import Any from loguru import logger -from nanobot.utils.helpers import image_placeholder_text - @dataclass class ToolCallRequest: @@ -356,6 +354,25 @@ class LLMProvider(ABC): @staticmethod def _strip_image_content(messages: list[dict[str, Any]]) -> list[dict[str, Any]] | None: """Replace image_url blocks with text placeholder. Returns None if no images found.""" + return LLMProvider._strip_media_content(messages) + + _MEDIA_LABEL_MAP = {"image_url": "image", "input_audio": "audio", "video_url": "video"} + _STRIP_MEDIA_TYPES = frozenset({"image_url", "input_audio", "video_url"}) + + @staticmethod + def _media_placeholder(btype: str, block: dict[str, Any]) -> dict[str, str]: + """Build a text placeholder for a media block.""" + path = (block.get("_meta") or {}).get("path", "") + label = LLMProvider._MEDIA_LABEL_MAP.get(btype, "media") + text = f"[{label}: {path}]" if path else f"[{label}]" + return {"type": "text", "text": text} + + @staticmethod + def _strip_media_content(messages: list[dict[str, Any]]) -> list[dict[str, Any]] | None: + """Replace image_url and input_audio blocks with text placeholders. + + Returns None if no media blocks were found (no changes needed). + """ found = False result = [] for msg in messages: @@ -363,10 +380,8 @@ class LLMProvider(ABC): if isinstance(content, list): new_content = [] for b in content: - if isinstance(b, dict) and b.get("type") == "image_url": - path = (b.get("_meta") or {}).get("path", "") - placeholder = image_placeholder_text(path, empty="[image omitted]") - new_content.append({"type": "text", "text": placeholder}) + if isinstance(b, dict) and b.get("type") in LLMProvider._STRIP_MEDIA_TYPES: + new_content.append(LLMProvider._media_placeholder(b["type"], b)) found = True else: new_content.append(b) @@ -619,11 +634,9 @@ class LLMProvider(ABC): identical_error_count = 1 if error_key else 0 if not self._is_transient_response(response): - stripped = self._strip_image_content(original_messages) + stripped = self._strip_media_content(original_messages) if stripped is not None and stripped != kw["messages"]: - logger.warning( - "Non-transient LLM error with image content, retrying without images" - ) + logger.warning("Non-transient LLM error with media content, retrying without media") retry_kw = dict(kw) retry_kw["messages"] = stripped return await call(**retry_kw) diff --git a/nanobot/providers/openai_codex_provider.py b/nanobot/providers/openai_codex_provider.py index 44cb24786..a520e08a6 100644 --- a/nanobot/providers/openai_codex_provider.py +++ b/nanobot/providers/openai_codex_provider.py @@ -147,6 +147,107 @@ async def _request_codex( return await consume_sse(response, on_content_delta) +def _convert_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Convert OpenAI function-calling schema to Codex flat format.""" + converted: list[dict[str, Any]] = [] + for tool in tools: + fn = (tool.get("function") or {}) if tool.get("type") == "function" else tool + name = fn.get("name") + if not name: + continue + params = fn.get("parameters") or {} + converted.append({ + "type": "function", + "name": name, + "description": fn.get("description") or "", + "parameters": params if isinstance(params, dict) else {}, + }) + return converted + + +def _convert_messages(messages: list[dict[str, Any]]) -> tuple[str, list[dict[str, Any]]]: + system_prompt = "" + input_items: list[dict[str, Any]] = [] + + for idx, msg in enumerate(messages): + role = msg.get("role") + content = msg.get("content") + + if role == "system": + system_prompt = content if isinstance(content, str) else "" + continue + + if role == "user": + input_items.append(_convert_user_message(content)) + continue + + if role == "assistant": + if isinstance(content, str) and content: + input_items.append({ + "type": "message", "role": "assistant", + "content": [{"type": "output_text", "text": content}], + "status": "completed", "id": f"msg_{idx}", + }) + for tool_call in msg.get("tool_calls", []) or []: + fn = tool_call.get("function") or {} + call_id, item_id = _split_tool_call_id(tool_call.get("id")) + input_items.append({ + "type": "function_call", + "id": item_id or f"fc_{idx}", + "call_id": call_id or f"call_{idx}", + "name": fn.get("name"), + "arguments": fn.get("arguments") or "{}", + }) + continue + + if role == "tool": + call_id, _ = _split_tool_call_id(msg.get("tool_call_id")) + output_text = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) + input_items.append({"type": "function_call_output", "call_id": call_id, "output": output_text}) + + return system_prompt, input_items + + +def _convert_user_message(content: Any) -> dict[str, Any]: + if isinstance(content, str): + return {"role": "user", "content": [{"type": "input_text", "text": content}]} + if isinstance(content, list): + converted: list[dict[str, Any]] = [] + for item in content: + if not isinstance(item, dict): + continue + if item.get("type") == "text": + converted.append({"type": "input_text", "text": item.get("text", "")}) + elif item.get("type") == "image_url": + url = (item.get("image_url") or {}).get("url") + if url: + converted.append({"type": "input_image", "image_url": url, "detail": "auto"}) + elif item.get("type") == "input_audio": + audio_info = item.get("input_audio") or {} + audio_data = audio_info.get("data") + if audio_data: + converted.append({ + "type": "input_audio", + "input_audio": {"data": audio_data, "format": audio_info.get("format", "wav")}, + }) + elif item.get("type") == "video_url": + # Codex doesn't support native video → text placeholder + placeholder = LLMProvider._media_placeholder("video_url", item) + converted.append({"type": "input_text", "text": placeholder["text"]}) + if converted: + return {"role": "user", "content": converted} + return {"role": "user", "content": [{"type": "input_text", "text": ""}]} + + +def _split_tool_call_id(tool_call_id: Any) -> tuple[str, str | None]: + if isinstance(tool_call_id, str) and tool_call_id: + if "|" in tool_call_id: + call_id, item_id = tool_call_id.split("|", 1) + return call_id, item_id or None + return tool_call_id, None + return "call_0", None + + def _prompt_cache_key(messages: list[dict[str, Any]]) -> str: raw = json.dumps(messages, ensure_ascii=True, sort_keys=True) return hashlib.sha256(raw.encode("utf-8")).hexdigest() diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py index 7267bac2a..2d297e1c0 100644 --- a/nanobot/utils/helpers.py +++ b/nanobot/utils/helpers.py @@ -34,6 +34,65 @@ def detect_image_mime(data: bytes) -> str | None: return None +# Audio formats supported by OpenAI input_audio block +_AUDIO_MIME_COMPAT = {"audio/wav", "audio/mpeg", "audio/mp3", "audio/aac", + "audio/ogg", "audio/flac", "audio/x-m4a", "audio/mp4"} + +# Map MIME types to the format token expected by OpenAI-compatible input_audio APIs. +_AUDIO_FORMAT_MAP: dict[str, str] = { + "audio/wav": "wav", + "audio/x-wav": "wav", + "audio/mpeg": "mp3", + "audio/mp3": "mp3", + "audio/aac": "aac", + "audio/ogg": "ogg", + "audio/flac": "flac", + "audio/x-m4a": "m4a", + "audio/mp4": "m4a", +} + + +def detect_audio_mime(data: bytes, filename: str = "") -> str | None: + """Detect audio MIME type from magic bytes; fallback to filename guess.""" + if data[:4] == b"RIFF" and data[8:12] == b"WAVE": + return "audio/wav" + if data[:2] in (b"\xff\xfb", b"\xff\xf3", b"\xff\xf2", b"\xff\xfa"): + return "audio/mpeg" + if data[:4] == b"fLaC": + return "audio/flac" + if data[:4] == b"OggS": + return "audio/ogg" + if len(data) > 8 and data[4:8] == b"ftyp": + # Only claim audio for M4A-specific brands; avoid matching MP4 video. + brand = data[8:12] + if brand in (b"M4A ", b"M4AB", b"M4AC"): + return "audio/x-m4a" + if filename: + import mimetypes as _mt + guessed = _mt.guess_type(filename)[0] + if guessed and guessed.startswith("audio/"): + return guessed + return None + + +def audio_mime_compat(mime: str | None) -> bool: + """Check if the audio MIME is compatible with OpenAI input_audio block.""" + if not mime: + return False + return mime in _AUDIO_MIME_COMPAT + + +def audio_format_for_api(mime: str) -> str: + """Convert an audio MIME type to the format token expected by the API. + + Falls back to the subtype portion of the MIME (e.g. "x-m4a" from + "audio/x-m4a") when no explicit mapping exists. + """ + if not mime: + return "wav" + return _AUDIO_FORMAT_MAP.get(mime, mime.split("/")[-1]) + + def build_image_content_blocks(raw: bytes, mime: str, path: str, label: str) -> list[dict[str, Any]]: """Build native image blocks plus a short text label.""" b64 = base64.b64encode(raw).decode() @@ -399,7 +458,7 @@ def build_status_content( search_usage_text: str | None = None, ) -> str: """Build a human-readable runtime status snapshot. - + Args: search_usage_text: Optional pre-formatted web search usage string (produced by SearchUsageInfo.format()). When provided @@ -431,7 +490,7 @@ def build_status_content( ] if search_usage_text: lines.append(search_usage_text) - return "\n".join(lines) + return "\n".join(lines) def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]: diff --git a/tests/config/test_config_migration.py b/tests/config/test_config_migration.py index add602c51..0bbd446ed 100644 --- a/tests/config/test_config_migration.py +++ b/tests/config/test_config_migration.py @@ -35,7 +35,8 @@ def test_load_config_keeps_max_tokens_and_ignores_legacy_memory_window(tmp_path) assert config.agents.defaults.max_tokens == 1234 assert config.agents.defaults.context_window_tokens == 65_536 - assert not hasattr(config.agents.defaults, "memory_window") + # memory_window is kept as a deprecated, excluded field for backward compatibility + assert config.agents.defaults.memory_window == 42 def test_save_config_writes_context_window_tokens_but_not_memory_window(tmp_path) -> None: diff --git a/tests/providers/test_provider_retry.py b/tests/providers/test_provider_retry.py index 78c2a791e..763ee4eae 100644 --- a/tests/providers/test_provider_retry.py +++ b/tests/providers/test_provider_retry.py @@ -196,7 +196,7 @@ async def test_image_fallback_returns_error_on_second_failure() -> None: @pytest.mark.asyncio async def test_image_fallback_without_meta_uses_default_placeholder() -> None: - """When _meta is absent, fallback placeholder is '[image omitted]'.""" + """When _meta is absent, fallback placeholder is '[image]'.""" provider = ScriptedProvider([ LLMResponse(content="error", finish_reason="error"), LLMResponse(content="ok"), @@ -210,7 +210,7 @@ async def test_image_fallback_without_meta_uses_default_placeholder() -> None: for msg in msgs_on_retry: content = msg.get("content") if isinstance(content, list): - assert any("[image omitted]" in (b.get("text") or "") for b in content) + assert any("[image]" in (b.get("text") or "") for b in content) @pytest.mark.asyncio diff --git a/tests/test_context_multimodal.py b/tests/test_context_multimodal.py index 37efb843e..5bfb89a1b 100644 --- a/tests/test_context_multimodal.py +++ b/tests/test_context_multimodal.py @@ -32,21 +32,21 @@ def test_build_user_content_keeps_only_first_three_images(tmp_path: Path) -> Non assert isinstance(content, list) assert sum(1 for block in content if block.get("type") == "image_url") == max_images - assert content[-1]["text"].startswith( - f"[Skipped 1 image: only the first {max_images} images are included]" - ) + text_block = content[-1]["text"] + assert "[Skipped 1 image: only the first 3 images are included]" in text_block def test_build_user_content_skips_invalid_images_with_note(tmp_path: Path) -> None: builder = _builder(tmp_path) + # .txt extension → mimetypes does NOT guess image/*, so it's rejected bad = tmp_path / "not-image.txt" bad.write_text("hello", encoding="utf-8") content = builder._build_user_content("what is this?", [str(bad)]) - assert isinstance(content, str) - assert "[Skipped image: unsupported or invalid image format (not-image.txt)]" in content - assert content.endswith("what is this?") + # .txt is not an image MIME → goes to non-image path → [file: ...] placeholder + assert isinstance(content, list) + assert any("[file:" in b.get("text", "") for b in content) def test_build_user_content_skips_missing_file(tmp_path: Path) -> None: @@ -55,7 +55,7 @@ def test_build_user_content_skips_missing_file(tmp_path: Path) -> None: content = builder._build_user_content("hello", [str(tmp_path / "ghost.png")]) assert isinstance(content, str) - assert "[Skipped image: file not found (ghost.png)]" in content + assert "[Skipped image: unable to read (ghost.png)]" in content assert content.endswith("hello") @@ -85,7 +85,7 @@ def test_build_user_content_respects_custom_input_limits(tmp_path: Path) -> None assert isinstance(content, list) assert sum(1 for block in content if block.get("type") == "image_url") == 1 - assert content[-1]["text"].startswith("[Skipped 1 image: only the first 1 images are included]") + assert "[Skipped 1 image: only the first 1 images are included]" in content[-1]["text"] def test_build_user_content_keeps_valid_images_and_skip_notes_together(tmp_path: Path) -> None: @@ -99,8 +99,6 @@ def test_build_user_content_keeps_valid_images_and_skip_notes_together(tmp_path: assert isinstance(content, list) assert content[0]["type"] == "image_url" - assert ( - "[Skipped image: unsupported or invalid image format (bad.txt)]" - in content[-1]["text"] - ) - assert content[-1]["text"].endswith("check both") + # .txt is non-image → goes to non-image path → [file: ...] placeholder + file_blocks = [b for b in content if b.get("type") == "text" and "[file:" in b.get("text", "")] + assert len(file_blocks) == 1 diff --git a/tests/test_multimodal_capabilities.py b/tests/test_multimodal_capabilities.py new file mode 100644 index 000000000..b42b104e6 --- /dev/null +++ b/tests/test_multimodal_capabilities.py @@ -0,0 +1,497 @@ +"""Tests for multimodal model capabilities: vision/audio config, media routing, fallback.""" + +import pytest + +from nanobot.agent.context import ContextBuilder +from nanobot.config.schema import AgentDefaults, InputLimitsConfig +from nanobot.providers.base import LLMProvider +from nanobot.utils.helpers import audio_mime_compat, detect_audio_mime + +# ── Config: supports_vision / supports_audio ────────────────────────── + +class TestSupportsVision: + def test_unconfigured_returns_none(self): + d = AgentDefaults() + assert d.supports_vision("gpt-4o") is None + + def test_match_simple(self): + d = AgentDefaults(vision_models=["gpt-4o", "claude-sonnet-4"]) + assert d.supports_vision("gpt-4o") is True + + def test_match_with_provider_prefix(self): + d = AgentDefaults(vision_models=["gpt-4o"]) + assert d.supports_vision("openai/gpt-4o-2024-11-20") is True + + def test_no_match(self): + d = AgentDefaults(vision_models=["gpt-4o"]) + assert d.supports_vision("deepseek-r1") is False + + def test_case_insensitive(self): + d = AgentDefaults(vision_models=["GPT-4o"]) + assert d.supports_vision("openai/GPT-4O-2024") is True + + +class TestSupportsAudio: + def test_unconfigured_returns_none(self): + d = AgentDefaults() + assert d.supports_audio("gpt-4o") is None + + def test_match(self): + d = AgentDefaults(audio_models=["gpt-4o", "gemini-2.0"]) + assert d.supports_audio("google/gemini-2.0-flash") is True + + def test_no_match(self): + d = AgentDefaults(audio_models=["gpt-4o"]) + assert d.supports_audio("deepseek-r1") is False + + +class TestSupportsVideo: + def test_unconfigured_returns_none(self): + d = AgentDefaults() + assert d.supports_video("glm-5v-turbo") is None + + def test_match(self): + d = AgentDefaults(video_models=["glm-5v", "gemini-2.0"]) + assert d.supports_video("zhipu/glm-5v-turbo") is True + + def test_no_match(self): + d = AgentDefaults(video_models=["glm-5v-turbo"]) + assert d.supports_video("deepseek-r1") is False + + +# ── detect_audio_mime ───────────────────────────────────────────────── + +class TestDetectAudioMime: + def test_wav(self): + data = b"RIFF" + b"\x00" * 4 + b"WAVE" + b"\x00" * 8 + assert detect_audio_mime(data) == "audio/wav" + + def test_mp3(self): + data = b"\xff\xfb" + b"\x00" * 10 + assert detect_audio_mime(data) == "audio/mpeg" + + def test_flac(self): + data = b"fLaC" + b"\x00" * 10 + assert detect_audio_mime(data) == "audio/flac" + + def test_ogg(self): + data = b"OggS" + b"\x00" * 10 + assert detect_audio_mime(data) == "audio/ogg" + + def test_m4a(self): + data = b"\x00\x00\x00\x20ftypM4A " + b"\x00" * 10 + # data[8:12] must be exactly "M4A " (4 bytes including trailing space) + assert data[4:8] == b"ftyp" + assert detect_audio_mime(data) == "audio/x-m4a" + + def test_fallback_to_filename(self): + data = b"\x00" * 20 + assert detect_audio_mime(data, filename="test.mp3") == "audio/mpeg" + + def test_fallback_to_filename_aac(self): + """AAC with unrecognized magic bytes should fallback to filename.""" + data = b"\x00" * 20 + result = detect_audio_mime(data, filename="test.aac") + assert result is not None and result.startswith("audio/") + + def test_unknown_returns_none(self): + data = b"\x00" * 20 + assert detect_audio_mime(data) is None + + +class TestAudioMimeCompat: + def test_compatible(self): + assert audio_mime_compat("audio/wav") is True + assert audio_mime_compat("audio/mpeg") is True + assert audio_mime_compat("audio/ogg") is True + + def test_incompatible(self): + assert audio_mime_compat("audio/silk") is False + assert audio_mime_compat("audio/amr") is False + + def test_none(self): + assert audio_mime_compat(None) is False + + +# ── _build_user_content ─────────────────────────────────────────────── + +class TestBuildUserContent: + @pytest.fixture + def ctx(self, tmp_path): + return ContextBuilder(tmp_path, timezone="UTC") + + def _make_png(self, size: int = 64) -> bytes: + """Minimal valid PNG.""" + import struct + import zlib + header = b"\x89PNG\r\n\x1a\n" + ihdr_data = struct.pack(">IIBBBBB", 1, 1, 8, 2, 0, 0, 0) + ihdr_crc = zlib.crc32(b"IHDR" + ihdr_data) & 0xFFFFFFFF + ihdr = struct.pack(">I", 13) + b"IHDR" + ihdr_data + struct.pack(">I", ihdr_crc) + raw = b"\x00\x00\x00\x00" + idat_crc = zlib.crc32(b"IDAT" + raw) & 0xFFFFFFFF + idat = struct.pack(">I", len(raw)) + b"IDAT" + raw + struct.pack(">I", idat_crc) + iend_crc = zlib.crc32(b"IEND") & 0xFFFFFFFF + iend = struct.pack(">I", 0) + b"IEND" + struct.pack(">I", iend_crc) + return header + ihdr + idat + iend + + def _make_wav(self) -> bytes: + """Minimal valid WAV.""" + data = b"\x00\x00" + fmt_chunk = ( + b"\x01\x00" # PCM + + (1).to_bytes(2, "little") # mono + + (44100).to_bytes(4, "little") # sample rate + + (88200).to_bytes(4, "little") # byte rate + + (2).to_bytes(2, "little") # block align + + (16).to_bytes(2, "little") # bits per sample + ) + return ( + b"RIFF" + + (36 + len(data)).to_bytes(4, "little") + + b"WAVE" + + b"fmt " + + (16).to_bytes(4, "little") + + fmt_chunk + + b"data" + + len(data).to_bytes(4, "little") + + data + ) + + def test_no_media_returns_text(self, ctx): + result = ctx._build_user_content("hello", None) + assert result == "hello" + + def test_image_sends_image(self, ctx, tmp_path): + img_path = tmp_path / "test.png" + img_path.write_bytes(self._make_png()) + result = ctx._build_user_content("look", [str(img_path)], supports_vision=True) + assert isinstance(result, list) + assert any(b.get("type") == "image_url" for b in result) + + def test_image_vision_none_sends_image(self, ctx, tmp_path): + """Unconfigured (None) should preserve existing behavior: send image.""" + img_path = tmp_path / "test.png" + img_path.write_bytes(self._make_png()) + result = ctx._build_user_content("look", [str(img_path)], supports_vision=None) + assert isinstance(result, list) + assert any(b.get("type") == "image_url" for b in result) + + def test_audio_supports_true_compatible_sends_input_audio(self, ctx, tmp_path): + wav_path = tmp_path / "test.wav" + wav_path.write_bytes(self._make_wav()) + result = ctx._build_user_content("listen", [str(wav_path)], supports_audio=True) + assert isinstance(result, list) + audio_blocks = [b for b in result if b.get("type") == "input_audio"] + assert len(audio_blocks) == 1 + assert "data" in audio_blocks[0]["input_audio"] + + def test_audio_supports_false_skips(self, ctx, tmp_path): + wav_path = tmp_path / "test.wav" + wav_path.write_bytes(self._make_wav()) + result = ctx._build_user_content("listen", [str(wav_path)], supports_audio=False) + # Audio not supported — audio placeholder instead of input_audio block + assert isinstance(result, list) + assert not any(b.get("type") == "input_audio" for b in result) + assert any("[audio:" in (b.get("text") or "") for b in result) + + def test_audio_supports_none_skips(self, ctx, tmp_path): + wav_path = tmp_path / "test.wav" + wav_path.write_bytes(self._make_wav()) + result = ctx._build_user_content("listen", [str(wav_path)], supports_audio=None) + # Audio support unknown — audio placeholder instead of input_audio block + assert isinstance(result, list) + assert not any(b.get("type") == "input_audio" for b in result) + + def test_audio_incompatible_format_skips(self, ctx, tmp_path): + """SILK format should be skipped even if supports_audio=True.""" + silk_path = tmp_path / "test.silk" + silk_path.write_bytes(b"\x02#!SILK_V3" + b"\x00" * 20) + result = ctx._build_user_content("listen", [str(silk_path)], supports_audio=True) + # SILK is not detected as a known audio format, so it falls through + # to the generic [file: ...] placeholder + assert isinstance(result, list) + assert not any(b.get("type") == "input_audio" for b in result) + + def test_mixed_image_and_audio(self, ctx, tmp_path): + """Both image and audio in same message with both capabilities enabled.""" + img_path = tmp_path / "test.png" + img_path.write_bytes(self._make_png()) + wav_path = tmp_path / "test.wav" + wav_path.write_bytes(self._make_wav()) + result = ctx._build_user_content("check", [str(img_path), str(wav_path)], + supports_vision=True, supports_audio=True) + assert isinstance(result, list) + types = [b.get("type") for b in result if isinstance(b, dict)] + assert "image_url" in types + assert "input_audio" in types + assert "text" in types + + def _make_mp4(self) -> bytes: + """Minimal MP4 with ftyp box (isom brand).""" + # ftyp box: size(4) + 'ftyp'(4) + brand(4) + ... + ftyp_data = b"isom" + b"\x00" * 12 # minor_version + compatible brands + ftyp_box = (8 + len(ftyp_data)).to_bytes(4, "big") + b"ftyp" + ftyp_data + return ftyp_box + + def test_video_supports_true_sends_video_url(self, ctx, tmp_path): + mp4_path = tmp_path / "test.mp4" + mp4_path.write_bytes(self._make_mp4()) + result = ctx._build_user_content("watch", [str(mp4_path)], supports_video=True) + assert isinstance(result, list) + video_blocks = [b for b in result if b.get("type") == "video_url"] + assert len(video_blocks) == 1 + url = video_blocks[0]["video_url"]["url"] + assert url.startswith("data:video/mp4;base64,") + + def test_video_supports_false_placeholder(self, ctx, tmp_path): + mp4_path = tmp_path / "test.mp4" + mp4_path.write_bytes(self._make_mp4()) + result = ctx._build_user_content("watch", [str(mp4_path)], supports_video=False) + assert isinstance(result, list) + video_blocks = [b for b in result if b.get("type") == "text" and "[video:" in b.get("text", "")] + assert len(video_blocks) == 1 + + def test_video_supports_none_placeholder(self, ctx, tmp_path): + """Unconfigured (None) should use [video: path] placeholder.""" + mp4_path = tmp_path / "test.mp4" + mp4_path.write_bytes(self._make_mp4()) + result = ctx._build_user_content("watch", [str(mp4_path)], supports_video=None) + assert isinstance(result, list) + video_blocks = [b for b in result if b.get("type") == "text" and "[video:" in b.get("text", "")] + assert len(video_blocks) == 1 + + +# ── Audio/Video input limits ────────────────────────────────────────── + +class TestInputLimitsAudioVideo: + @pytest.fixture + def ctx(self, tmp_path): + return ContextBuilder(tmp_path, timezone="UTC", + input_limits=InputLimitsConfig( + max_input_images=3, + max_input_image_bytes=10 * 1024 * 1024, + max_input_audio_bytes=100, # 100 bytes for testing + max_input_video_bytes=200, # 200 bytes for testing + )) + + def _make_wav(self) -> bytes: + """Minimal valid WAV (~50 bytes).""" + data = b"\x00\x00" + fmt_chunk = ( + b"\x01\x00" + (1).to_bytes(2, "little") + (44100).to_bytes(4, "little") + + (88200).to_bytes(4, "little") + (2).to_bytes(2, "little") + + (16).to_bytes(2, "little") + ) + return ( + b"RIFF" + (36 + len(data)).to_bytes(4, "little") + b"WAVE" + + b"fmt " + (16).to_bytes(4, "little") + fmt_chunk + + b"data" + len(data).to_bytes(4, "little") + data + ) + + def _make_mp4(self) -> bytes: + """Minimal MP4 with ftyp box.""" + ftyp_data = b"isom" + b"\x00" * 12 + return (8 + len(ftyp_data)).to_bytes(4, "big") + b"ftyp" + ftyp_data + + def test_oversized_audio_skipped_with_note(self, ctx, tmp_path): + """Audio exceeding max_input_audio_bytes should be skipped with note.""" + wav_path = tmp_path / "big.wav" + wav_path.write_bytes(self._make_wav() + b"\x00" * 100) # ~150 bytes > 100 limit + result = ctx._build_user_content("listen", [str(wav_path)], supports_audio=True) + assert isinstance(result, str) + assert "[Skipped audio: file too large" in result + assert result.endswith("listen") + + def test_audio_within_limit_accepted(self, ctx, tmp_path): + """Audio within limit should be sent as input_audio.""" + wav_path = tmp_path / "small.wav" + wav_path.write_bytes(self._make_wav()) # ~50 bytes < 100 limit + result = ctx._build_user_content("listen", [str(wav_path)], supports_audio=True) + assert isinstance(result, list) + assert any(b.get("type") == "input_audio" for b in result) + + def test_oversized_video_skipped_with_note(self, ctx, tmp_path): + """Video exceeding max_input_video_bytes should be skipped with note.""" + mp4_path = tmp_path / "big.mp4" + mp4_path.write_bytes(self._make_mp4() + b"\x00" * 200) # > 200 limit + result = ctx._build_user_content("watch", [str(mp4_path)], supports_video=True) + assert isinstance(result, str) + assert "[Skipped video: file too large" in result + + def test_video_within_limit_accepted(self, ctx, tmp_path): + """Video within limit should be sent as video_url.""" + mp4_path = tmp_path / "small.mp4" + mp4_path.write_bytes(self._make_mp4()) # ~24 bytes < 200 limit + result = ctx._build_user_content("watch", [str(mp4_path)], supports_video=True) + assert isinstance(result, list) + assert any(b.get("type") == "video_url" for b in result) + + def test_audio_filename_fallback_mp3(self, ctx, tmp_path): + """MP3 file with unrecognized magic bytes should fallback to filename.""" + mp3_path = tmp_path / "test.mp3" + mp3_path.write_bytes(b"\x00" * 50) # unrecognized magic, but .mp3 extension + result = ctx._build_user_content("listen", [str(mp3_path)], supports_audio=True) + assert isinstance(result, list) + audio_blocks = [b for b in result if b.get("type") == "input_audio"] + assert len(audio_blocks) == 1 + assert audio_blocks[0]["input_audio"]["format"] == "mp3" + + def test_missing_file_gracefully_skipped(self, ctx, tmp_path): + """Missing file should be gracefully skipped.""" + result = ctx._build_user_content("hello", [str(tmp_path / "ghost.wav")], supports_audio=True) + # Missing file is silently skipped (non-image path uses continue on OSError) + assert isinstance(result, str) + assert result == "hello" + + +# ── _strip_media_content ────────────────────────────────────────────── + +class TestStripMediaContent: + def test_no_media_returns_none(self): + msgs = [{"role": "user", "content": "hello"}] + assert LLMProvider._strip_media_content(msgs) is None + + def test_strips_image_url(self): + msgs = [{"role": "user", "content": [ + {"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}, + "_meta": {"path": "/img.png"}}, + {"type": "text", "text": "desc"}, + ]}] + result = LLMProvider._strip_media_content(msgs) + assert result is not None + assert result[0]["content"][0] == {"type": "text", "text": "[image: /img.png]"} + assert result[0]["content"][1] == {"type": "text", "text": "desc"} + + def test_strips_input_audio(self): + msgs = [{"role": "user", "content": [ + {"type": "input_audio", "input_audio": {"data": "abc", "format": "wav"}, + "_meta": {"path": "/audio.wav"}}, + {"type": "text", "text": "desc"}, + ]}] + result = LLMProvider._strip_media_content(msgs) + assert result is not None + assert result[0]["content"][0] == {"type": "text", "text": "[audio: /audio.wav]"} + + def test_strips_both(self): + msgs = [{"role": "user", "content": [ + {"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}, + "_meta": {"path": "/img.png"}}, + {"type": "input_audio", "input_audio": {"data": "abc", "format": "wav"}, + "_meta": {"path": "/audio.wav"}}, + ]}] + result = LLMProvider._strip_media_content(msgs) + assert result is not None + assert len(result[0]["content"]) == 2 + assert "[image:" in result[0]["content"][0]["text"] + assert "[audio:" in result[0]["content"][1]["text"] + + def test_strips_video_url(self): + msgs = [{"role": "user", "content": [ + {"type": "video_url", "video_url": {"url": "data:video/mp4;base64,abc"}, + "_meta": {"path": "/video.mp4"}}, + {"type": "text", "text": "desc"}, + ]}] + result = LLMProvider._strip_media_content(msgs) + assert result is not None + assert result[0]["content"][0] == {"type": "text", "text": "[video: /video.mp4]"} + assert result[0]["content"][1] == {"type": "text", "text": "desc"} + + def test_string_content_unchanged(self): + msgs = [{"role": "user", "content": "plain text"}] + assert LLMProvider._strip_media_content(msgs) is None + + +# ── _strip_image_content backward compat ────────────────────────────── + +class TestStripImageContentCompat: + def test_still_works(self): + msgs = [{"role": "user", "content": [ + {"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}, + "_meta": {"path": "/img.png"}}, + ]}] + result = LLMProvider._strip_image_content(msgs) + assert result is not None + assert "[image: /img.png]" in result[0]["content"][0]["text"] + + +# ── _sanitize_persisted_blocks for input_audio ──────────────────────── + +class TestSanitizePersistedBlocks: + @pytest.fixture + def loop_mock(self): + from nanobot.agent.loop import AgentLoop + loop = object.__new__(AgentLoop) + return loop + + def test_audio_block_replaced_with_placeholder(self, loop_mock): + content = [ + {"type": "input_audio", "input_audio": {"data": "abc", "format": "wav"}, + "_meta": {"path": "/audio.wav"}}, + {"type": "text", "text": "hello"}, + ] + result = loop_mock._sanitize_persisted_blocks(content) + assert len(result) == 2 + assert result[0] == {"type": "text", "text": "[audio: /audio.wav]"} + assert result[1] == {"type": "text", "text": "hello"} + + def test_image_block_replaced(self, loop_mock): + content = [ + {"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}, + "_meta": {"path": "/img.png"}}, + ] + result = loop_mock._sanitize_persisted_blocks(content) + assert len(result) == 1 + assert "[image: /img.png]" in result[0]["text"] + + def test_video_block_replaced_with_placeholder(self, loop_mock): + content = [ + {"type": "video_url", "video_url": {"url": "data:video/mp4;base64,abc"}, + "_meta": {"path": "/video.mp4"}}, + {"type": "text", "text": "hello"}, + ] + result = loop_mock._sanitize_persisted_blocks(content) + assert len(result) == 2 + assert result[0] == {"type": "text", "text": "[video: /video.mp4]"} + assert result[1] == {"type": "text", "text": "hello"} + + def test_non_data_image_unchanged(self, loop_mock): + """Non-data URI image (already a placeholder) should pass through.""" + content = [ + {"type": "image_url", "image_url": {"url": "https://example.com/img.png"}}, + ] + result = loop_mock._sanitize_persisted_blocks(content) + assert len(result) == 1 + assert result[0]["type"] == "image_url" + + +# ── Anthropic provider input_audio handling ──────────────────────────── + +class TestAnthropicAudioConversion: + def test_input_audio_converted_to_text(self): + from nanobot.providers.anthropic_provider import AnthropicProvider + provider = AnthropicProvider.__new__(AnthropicProvider) + content = [ + {"type": "input_audio", "input_audio": {"data": "abc", "format": "wav"}, + "_meta": {"path": "/test.wav"}}, + {"type": "text", "text": "listen"}, + ] + result = provider._convert_user_content(content) + assert isinstance(result, list) + assert any("[audio:" in b.get("text", "") for b in result if b.get("type") == "text") + + +# ── OpenAI Codex provider input_audio handling ───────────────────────── + +class TestCodexAudioConversion: + def test_input_audio_passed_through(self): + from nanobot.providers.openai_codex_provider import _convert_user_message + content = [ + {"type": "input_audio", "input_audio": {"data": "abc123", "format": "wav"}}, + {"type": "text", "text": "listen"}, + ] + result = _convert_user_message(content) + assert result["role"] == "user" + audio_items = [i for i in result["content"] if i.get("type") == "input_audio"] + assert len(audio_items) == 1 + assert audio_items[0]["input_audio"]["data"] == "abc123"