feat: cherry-pick InputLimitsConfig (image OOM guard) and merge with multimodal support

Cherry-pick c4c0ac8 from nightly-26-03-29 which adds InputLimitsConfig
(max_input_images, max_input_image_bytes), image size/existence checks,
and wiring through AgentLoop/CLI. Merged with existing audio/video
multimodal handling, timezone support, and supports_* capability flags.
This commit is contained in:
chengyongru 2026-04-04 02:56:15 +08:00
parent c121547114
commit 4fa64dc73b
5 changed files with 330 additions and 28 deletions

View File

@ -6,12 +6,17 @@ import platform
from pathlib import Path
from typing import Any
from nanobot.utils.helpers import current_time_str
from nanobot.agent.memory import MemoryStore
from nanobot.utils.prompt_templates import render_template
from nanobot.agent.skills import SkillsLoader
from nanobot.utils.helpers import build_assistant_message, detect_image_mime
from nanobot.config.schema import InputLimitsConfig
from nanobot.utils.helpers import (
audio_mime_compat,
build_assistant_message,
current_time_str,
detect_audio_mime,
detect_image_mime,
)
class ContextBuilder:
@ -20,11 +25,12 @@ class ContextBuilder:
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md"]
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
def __init__(self, workspace: Path, timezone: str | None = None):
def __init__(self, workspace: Path, timezone: str | None = None, input_limits: InputLimitsConfig | None = None):
self.workspace = workspace
self.timezone = timezone
self.memory = MemoryStore(workspace)
self.skills = SkillsLoader(workspace)
self.input_limits = input_limits or InputLimitsConfig()
def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
"""Build the system prompt from identity, bootstrap files, memory, and skills."""
@ -108,10 +114,18 @@ class ContextBuilder:
channel: str | None = None,
chat_id: str | None = None,
current_role: str = "user",
supports_vision: bool | None = None,
supports_audio: bool | None = None,
supports_video: bool | None = None,
) -> list[dict[str, Any]]:
"""Build the complete message list for an LLM call."""
runtime_ctx = self._build_runtime_context(channel, chat_id, self.timezone)
user_content = self._build_user_content(current_message, media)
user_content = self._build_user_content(
current_message, media,
supports_vision=supports_vision,
supports_audio=supports_audio,
supports_video=supports_video,
)
# Merge runtime context and user content into a single user message
# to avoid consecutive same-role messages that some providers reject.
@ -131,31 +145,154 @@ class ContextBuilder:
messages.append({"role": current_role, "content": merged})
return messages
def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]:
"""Build user message content with optional base64-encoded images."""
@staticmethod
def _encode_image_block(raw: bytes, mime: str, path: Path) -> dict[str, Any]:
"""Base64-encode file bytes into an image_url content block."""
b64 = base64.b64encode(raw).decode()
return {
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{b64}"},
"_meta": {"path": str(path)},
}
def _build_user_content(
self,
text: str,
media: list[str] | None,
*,
supports_vision: bool | None = None,
supports_audio: bool | None = None,
supports_video: bool | None = None,
) -> str | list[dict[str, Any]]:
"""Build user message content with optional media blocks.
Args:
text: The user text message.
media: List of file paths to media files.
supports_vision: True=model supports images, False=use placeholder,
None=unconfigured (send images as before).
supports_audio: True=model supports native audio, False/None=skip
(channel layer already transcribed).
supports_video: True=model supports native video, False/None=use
[file: path] placeholder.
"""
if not media:
return text
images = []
blocks: list[dict[str, Any]] = []
notes: list[str] = []
limits = self.input_limits
# Enforce image count limit
max_images = limits.max_input_images
image_count = 0
image_media = []
non_image_media = []
for path in media:
p = Path(path)
if not p.is_file():
guessed_mime = mimetypes.guess_type(path)[0] or ""
if guessed_mime.startswith("image/"):
image_count += 1
if image_count <= max_images:
image_media.append(path)
elif image_count == max_images + 1:
notes.append(
f"[Skipped {len(media) - max_images} images: "
f"only the first {max_images} images are included]"
)
else:
non_image_media.append(path)
# Process images
for path in image_media:
p = Path(path)
try:
with p.open("rb") as f:
header = f.read(32)
except OSError:
notes.append(f"[Skipped image: unable to read ({p.name or path})]")
continue
try:
size = p.stat().st_size
except OSError:
notes.append(f"[Skipped image: unable to read ({p.name or path})]")
continue
if size > limits.max_input_image_bytes:
size_mb = limits.max_input_image_bytes // (1024 * 1024)
notes.append(f"[Skipped image: file too large ({p.name}, limit {size_mb} MB)]")
continue
img_mime = detect_image_mime(header) or mimetypes.guess_type(path)[0]
if not img_mime or not img_mime.startswith("image/"):
notes.append(f"[Skipped image: unsupported or invalid image format ({p.name})]")
continue
blocks.append(self._encode_image_block(p.read_bytes(), img_mime, p))
# Process non-image media (audio, video, unknown)
for path in non_image_media:
p = Path(path)
guessed_mime = mimetypes.guess_type(path)[0] or ""
is_audio = guessed_mime.startswith("audio/")
try:
with p.open("rb") as f:
header = f.read(32)
except OSError:
continue
# Audio detection: by magic bytes or by filename
# Always pass filename so fallback can match when magic bytes fail
audio_mime = detect_audio_mime(header, filename=path)
if audio_mime or is_audio:
if supports_audio is True and audio_mime_compat(audio_mime):
try:
size = p.stat().st_size
except OSError:
continue
if size > limits.max_input_audio_bytes:
size_mb = limits.max_input_audio_bytes // (1024 * 1024)
notes.append(f"[Skipped audio: file too large ({p.name}, limit {size_mb} MB)]")
continue
raw = p.read_bytes()
# Detect real MIME type from magic bytes; fallback to filename guess
mime = detect_image_mime(raw) or mimetypes.guess_type(path)[0]
if not mime or not mime.startswith("image/"):
continue
b64 = base64.b64encode(raw).decode()
images.append({
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{b64}"},
blocks.append({
"type": "input_audio",
"input_audio": {"data": b64, "format": audio_mime.split("/")[-1]},
"_meta": {"path": str(p)},
})
continue
if not images:
return text
return images + [{"type": "text", "text": text}]
# Video detection: by filename extension
is_video = guessed_mime.startswith("video/")
if is_video:
if supports_video is True:
try:
size = p.stat().st_size
except OSError:
continue
if size > limits.max_input_video_bytes:
size_mb = limits.max_input_video_bytes // (1024 * 1024)
notes.append(f"[Skipped video: file too large ({p.name}, limit {size_mb} MB)]")
continue
raw = p.read_bytes()
b64 = base64.b64encode(raw).decode()
blocks.append({
"type": "video_url",
"video_url": {"url": f"data:{guessed_mime};base64,{b64}"},
"_meta": {"path": str(p)},
})
else:
blocks.append({"type": "text", "text": f"[file: {p}]"})
continue
# Unknown -> text placeholder
blocks.append({"type": "text", "text": f"[file: {p}]"})
note_text = "\n".join(notes).strip()
text_block = text if not note_text else (f"{note_text}\n\n{text}" if text else note_text)
if not blocks:
return text_block
return blocks + [{"type": "text", "text": text_block}]
def add_tool_result(
self, messages: list[dict[str, Any]],

View File

@ -36,7 +36,8 @@ from nanobot.utils.helpers import image_placeholder_text, truncate_text
from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE
if TYPE_CHECKING:
from nanobot.config.schema import ChannelsConfig, ExecToolConfig, WebToolsConfig
if TYPE_CHECKING:
from nanobot.config.schema import ChannelsConfig, ExecToolConfig, InputLimitsConfig, WebToolsConfig
from nanobot.cron.service import CronService
@ -173,6 +174,7 @@ class AgentLoop:
provider_retry_mode: str = "standard",
web_config: WebToolsConfig | None = None,
exec_config: ExecToolConfig | None = None,
input_limits: InputLimitsConfig | None = None,
cron_service: CronService | None = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
@ -180,8 +182,11 @@ class AgentLoop:
channels_config: ChannelsConfig | None = None,
timezone: str | None = None,
hooks: list[AgentHook] | None = None,
supports_vision: bool | None = None,
supports_audio: bool | None = None,
supports_video: bool | None = None,
):
from nanobot.config.schema import ExecToolConfig, WebToolsConfig
from nanobot.config.schema import ExecToolConfig, InputLimitsConfig, WebToolsConfig
defaults = AgentDefaults()
self.bus = bus
@ -206,13 +211,17 @@ class AgentLoop:
self.provider_retry_mode = provider_retry_mode
self.web_config = web_config or WebToolsConfig()
self.exec_config = exec_config or ExecToolConfig()
self.input_limits = input_limits or InputLimitsConfig()
self.cron_service = cron_service
self.restrict_to_workspace = restrict_to_workspace
self._start_time = time.time()
self._last_usage: dict[str, int] = {}
self._extra_hooks: list[AgentHook] = hooks or []
self.context = ContextBuilder(workspace, timezone=timezone)
self.context = ContextBuilder(workspace, timezone=timezone, input_limits=self.input_limits)
self._supports_vision = supports_vision
self._supports_audio = supports_audio
self._supports_video = supports_video
self.sessions = session_manager or SessionManager(workspace)
self.tools = ToolRegistry()
self.runner = AgentRunner(provider)
@ -532,6 +541,9 @@ class AgentLoop:
history=history,
current_message=msg.content, channel=channel, chat_id=chat_id,
current_role=current_role,
supports_vision=self._supports_vision,
supports_audio=self._supports_audio,
supports_video=self._supports_video,
)
final_content, _, all_msgs = await self._run_agent_loop(
messages, session=session, channel=channel, chat_id=chat_id,
@ -571,6 +583,9 @@ class AgentLoop:
current_message=msg.content,
media=msg.media if msg.media else None,
channel=msg.channel, chat_id=msg.chat_id,
supports_vision=self._supports_vision,
supports_audio=self._supports_audio,
supports_video=self._supports_video,
)
async def _bus_progress(content: str, *, tool_hint: bool = False) -> None:

View File

@ -675,6 +675,7 @@ def gateway(
max_tool_result_chars=config.agents.defaults.max_tool_result_chars,
provider_retry_mode=config.agents.defaults.provider_retry_mode,
exec_config=config.tools.exec,
input_limits=config.tools.input_limits,
cron_service=cron,
restrict_to_workspace=config.tools.restrict_to_workspace,
session_manager=session_manager,
@ -907,6 +908,7 @@ def agent(
max_tool_result_chars=config.agents.defaults.max_tool_result_chars,
provider_retry_mode=config.agents.defaults.provider_retry_mode,
exec_config=config.tools.exec,
input_limits=config.tools.input_limits,
cron_service=cron,
restrict_to_workspace=config.tools.restrict_to_workspace,
mcp_servers=config.tools.mcp_servers,

View File

@ -74,10 +74,44 @@ class AgentDefaults(Base):
max_tool_iterations: int = 200
max_tool_result_chars: int = 16_000
provider_retry_mode: Literal["standard", "persistent"] = "standard"
# Deprecated compatibility field: accepted from old configs but ignored at runtime.
memory_window: int | None = Field(default=None, exclude=True)
reasoning_effort: str | None = None # low / medium / high - enables LLM thinking mode
timezone: str = "UTC" # IANA timezone, e.g. "Asia/Shanghai", "America/New_York"
vision_models: list[str] = Field(default_factory=list) # Models that support image input
audio_models: list[str] = Field(default_factory=list) # Models that support native audio input
video_models: list[str] = Field(default_factory=list) # Models that support native video input
dream: DreamConfig = Field(default_factory=DreamConfig)
@staticmethod
def _bare_model(model: str) -> str:
"""Strip provider prefix, e.g. 'openai/gpt-4o' -> 'gpt-4o'."""
return model.split("/", 1)[-1].lower() if "/" in model else model.lower()
def _supports_capability(self, model: str, patterns: list[str]) -> bool | None:
"""Check if model matches any pattern. Returns None if patterns is empty."""
if not patterns:
return None
bare = self._bare_model(model)
return any(p.lower() in bare for p in patterns)
def supports_vision(self, model: str) -> bool | None:
"""Check if model supports vision. None if unconfigured."""
return self._supports_capability(model, self.vision_models)
def supports_audio(self, model: str) -> bool | None:
"""Check if model supports native audio. None if unconfigured."""
return self._supports_capability(model, self.audio_models)
def supports_video(self, model: str) -> bool | None:
"""Check if model supports native video. None if unconfigured."""
return self._supports_capability(model, self.video_models)
@property
def should_warn_deprecated_memory_window(self) -> bool:
"""Return True when old memoryWindow is present without contextWindowTokens."""
return self.memory_window is not None and "context_window_tokens" not in self.model_fields_set
class AgentsConfig(Base):
"""Agent configuration."""
@ -107,7 +141,6 @@ class ProvidersConfig(Base):
dashscope: ProviderConfig = Field(default_factory=ProviderConfig)
vllm: ProviderConfig = Field(default_factory=ProviderConfig)
ollama: ProviderConfig = Field(default_factory=ProviderConfig) # Ollama local models
ovms: ProviderConfig = Field(default_factory=ProviderConfig) # OpenVINO Model Server (OVMS)
gemini: ProviderConfig = Field(default_factory=ProviderConfig)
moonshot: ProviderConfig = Field(default_factory=ProviderConfig)
minimax: ProviderConfig = Field(default_factory=ProviderConfig)
@ -177,6 +210,14 @@ class ExecToolConfig(Base):
path_append: str = ""
sandbox: str = "" # sandbox backend: "" (none) or "bwrap"
class InputLimitsConfig(Base):
"""Limits for user-provided multimodal inputs."""
max_input_images: int = 3
max_input_image_bytes: int = 10 * 1024 * 1024
class MCPServerConfig(Base):
"""MCP server connection configuration (stdio or HTTP)."""
@ -194,6 +235,7 @@ class ToolsConfig(Base):
web: WebToolsConfig = Field(default_factory=WebToolsConfig)
exec: ExecToolConfig = Field(default_factory=ExecToolConfig)
input_limits: InputLimitsConfig = Field(default_factory=InputLimitsConfig)
restrict_to_workspace: bool = False # restrict all tool access to workspace directory
mcp_servers: dict[str, MCPServerConfig] = Field(default_factory=dict)
ssrf_whitelist: list[str] = Field(default_factory=list) # CIDR ranges to exempt from SSRF blocking (e.g. ["100.64.0.0/10"] for Tailscale)

View File

@ -0,0 +1,106 @@
from pathlib import Path
from nanobot.agent.context import ContextBuilder
from nanobot.config.schema import InputLimitsConfig
PNG_BYTES = (
b"\x89PNG\r\n\x1a\n"
b"\x00\x00\x00\rIHDR"
b"\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00"
b"\x90wS\xde"
b"\x00\x00\x00\x0cIDATx\x9cc``\x00\x00\x00\x04\x00\x01"
b"\x0b\x0e-\xb4"
b"\x00\x00\x00\x00IEND\xaeB`\x82"
)
def _builder(tmp_path: Path, input_limits: InputLimitsConfig | None = None) -> ContextBuilder:
return ContextBuilder(tmp_path, input_limits=input_limits)
def test_build_user_content_keeps_only_first_three_images(tmp_path: Path) -> None:
builder = _builder(tmp_path)
max_images = builder.input_limits.max_input_images
paths = []
for i in range(max_images + 1):
path = tmp_path / f"img{i}.png"
path.write_bytes(PNG_BYTES)
paths.append(str(path))
content = builder._build_user_content("describe these", paths)
assert isinstance(content, list)
assert sum(1 for block in content if block.get("type") == "image_url") == max_images
assert content[-1]["text"].startswith(
f"[Skipped 1 image: only the first {max_images} images are included]"
)
def test_build_user_content_skips_invalid_images_with_note(tmp_path: Path) -> None:
builder = _builder(tmp_path)
bad = tmp_path / "not-image.txt"
bad.write_text("hello", encoding="utf-8")
content = builder._build_user_content("what is this?", [str(bad)])
assert isinstance(content, str)
assert "[Skipped image: unsupported or invalid image format (not-image.txt)]" in content
assert content.endswith("what is this?")
def test_build_user_content_skips_missing_file(tmp_path: Path) -> None:
builder = _builder(tmp_path)
content = builder._build_user_content("hello", [str(tmp_path / "ghost.png")])
assert isinstance(content, str)
assert "[Skipped image: file not found (ghost.png)]" in content
assert content.endswith("hello")
def test_build_user_content_skips_large_images_with_note(tmp_path: Path) -> None:
builder = _builder(tmp_path)
big = tmp_path / "big.png"
big.write_bytes(PNG_BYTES + b"x" * builder.input_limits.max_input_image_bytes)
content = builder._build_user_content("analyze", [str(big)])
limit_mb = builder.input_limits.max_input_image_bytes // (1024 * 1024)
assert isinstance(content, str)
assert f"[Skipped image: file too large (big.png, limit {limit_mb} MB)]" in content
def test_build_user_content_respects_custom_input_limits(tmp_path: Path) -> None:
builder = _builder(
tmp_path,
input_limits=InputLimitsConfig(max_input_images=1, max_input_image_bytes=1024),
)
small = tmp_path / "small.png"
large = tmp_path / "large.png"
small.write_bytes(PNG_BYTES)
large.write_bytes(PNG_BYTES + b"x" * 1024)
content = builder._build_user_content("describe", [str(small), str(large)])
assert isinstance(content, list)
assert sum(1 for block in content if block.get("type") == "image_url") == 1
assert content[-1]["text"].startswith("[Skipped 1 image: only the first 1 images are included]")
def test_build_user_content_keeps_valid_images_and_skip_notes_together(tmp_path: Path) -> None:
builder = _builder(tmp_path)
good = tmp_path / "good.png"
bad = tmp_path / "bad.txt"
good.write_bytes(PNG_BYTES)
bad.write_text("oops", encoding="utf-8")
content = builder._build_user_content("check both", [str(good), str(bad)])
assert isinstance(content, list)
assert content[0]["type"] == "image_url"
assert (
"[Skipped image: unsupported or invalid image format (bad.txt)]"
in content[-1]["text"]
)
assert content[-1]["text"].endswith("check both")