mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-23 03:15:58 +00:00
Add small guards for multimodal image inputs
This commit is contained in:
parent
2ac7dbfc6d
commit
16f0191c32
@ -18,6 +18,8 @@ class ContextBuilder:
|
|||||||
|
|
||||||
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md"]
|
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md"]
|
||||||
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
|
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
|
||||||
|
_MAX_INPUT_IMAGES = 3
|
||||||
|
_MAX_IMAGE_BYTES = 10 * 1024 * 1024
|
||||||
|
|
||||||
def __init__(self, workspace: Path):
|
def __init__(self, workspace: Path):
|
||||||
self.workspace = workspace
|
self.workspace = workspace
|
||||||
@ -149,21 +151,44 @@ Reply directly with text for conversations. Only use the 'message' tool to send
|
|||||||
return text
|
return text
|
||||||
|
|
||||||
images = []
|
images = []
|
||||||
for path in media:
|
notes: list[str] = []
|
||||||
|
extra_count = max(0, len(media) - self._MAX_INPUT_IMAGES)
|
||||||
|
if extra_count:
|
||||||
|
noun = "image" if extra_count == 1 else "images"
|
||||||
|
notes.append(
|
||||||
|
f"[Skipped {extra_count} {noun}: "
|
||||||
|
f"only the first {self._MAX_INPUT_IMAGES} images are included]"
|
||||||
|
)
|
||||||
|
|
||||||
|
for path in media[:self._MAX_INPUT_IMAGES]:
|
||||||
p = Path(path)
|
p = Path(path)
|
||||||
if not p.is_file():
|
if not p.is_file():
|
||||||
|
notes.append(f"[Skipped image: file not found ({p.name or path})]")
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
size = p.stat().st_size
|
||||||
|
except OSError:
|
||||||
|
notes.append(f"[Skipped image: unable to read ({p.name or path})]")
|
||||||
|
continue
|
||||||
|
if size > self._MAX_IMAGE_BYTES:
|
||||||
|
size_mb = self._MAX_IMAGE_BYTES // (1024 * 1024)
|
||||||
|
notes.append(f"[Skipped image: file too large ({p.name}, limit {size_mb} MB)]")
|
||||||
continue
|
continue
|
||||||
raw = p.read_bytes()
|
raw = p.read_bytes()
|
||||||
# Detect real MIME type from magic bytes; fallback to filename guess
|
# Detect real MIME type from magic bytes; fallback to filename guess
|
||||||
mime = detect_image_mime(raw) or mimetypes.guess_type(path)[0]
|
mime = detect_image_mime(raw) or mimetypes.guess_type(path)[0]
|
||||||
if not mime or not mime.startswith("image/"):
|
if not mime or not mime.startswith("image/"):
|
||||||
|
notes.append(f"[Skipped image: unsupported or invalid image format ({p.name})]")
|
||||||
continue
|
continue
|
||||||
b64 = base64.b64encode(raw).decode()
|
b64 = base64.b64encode(raw).decode()
|
||||||
images.append({"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}})
|
images.append({"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}})
|
||||||
|
|
||||||
|
note_text = "\n".join(notes).strip()
|
||||||
|
text_block = text if not note_text else (f"{note_text}\n\n{text}" if text else note_text)
|
||||||
|
|
||||||
if not images:
|
if not images:
|
||||||
return text
|
return text_block
|
||||||
return images + [{"type": "text", "text": text}]
|
return images + [{"type": "text", "text": text_block}]
|
||||||
|
|
||||||
def add_tool_result(
|
def add_tool_result(
|
||||||
self, messages: list[dict[str, Any]],
|
self, messages: list[dict[str, Any]],
|
||||||
|
|||||||
88
tests/test_context_multimodal.py
Normal file
88
tests/test_context_multimodal.py
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from nanobot.agent.context import ContextBuilder
|
||||||
|
|
||||||
|
|
||||||
|
PNG_BYTES = (
|
||||||
|
b"\x89PNG\r\n\x1a\n"
|
||||||
|
b"\x00\x00\x00\rIHDR"
|
||||||
|
b"\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00"
|
||||||
|
b"\x90wS\xde"
|
||||||
|
b"\x00\x00\x00\x0cIDATx\x9cc``\x00\x00\x00\x04\x00\x01"
|
||||||
|
b"\x0b\x0e-\xb4"
|
||||||
|
b"\x00\x00\x00\x00IEND\xaeB`\x82"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _builder(tmp_path: Path) -> ContextBuilder:
|
||||||
|
return ContextBuilder(tmp_path)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_user_content_keeps_only_first_three_images(tmp_path: Path) -> None:
|
||||||
|
builder = _builder(tmp_path)
|
||||||
|
max_images = ContextBuilder._MAX_INPUT_IMAGES
|
||||||
|
paths = []
|
||||||
|
for i in range(max_images + 1):
|
||||||
|
path = tmp_path / f"img{i}.png"
|
||||||
|
path.write_bytes(PNG_BYTES)
|
||||||
|
paths.append(str(path))
|
||||||
|
|
||||||
|
content = builder._build_user_content("describe these", paths)
|
||||||
|
|
||||||
|
assert isinstance(content, list)
|
||||||
|
assert sum(1 for block in content if block.get("type") == "image_url") == max_images
|
||||||
|
assert content[-1]["text"].startswith(
|
||||||
|
f"[Skipped 1 image: only the first {max_images} images are included]"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_user_content_skips_invalid_images_with_note(tmp_path: Path) -> None:
|
||||||
|
builder = _builder(tmp_path)
|
||||||
|
bad = tmp_path / "not-image.txt"
|
||||||
|
bad.write_text("hello", encoding="utf-8")
|
||||||
|
|
||||||
|
content = builder._build_user_content("what is this?", [str(bad)])
|
||||||
|
|
||||||
|
assert isinstance(content, str)
|
||||||
|
assert "[Skipped image: unsupported or invalid image format (not-image.txt)]" in content
|
||||||
|
assert content.endswith("what is this?")
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_user_content_skips_missing_file(tmp_path: Path) -> None:
|
||||||
|
builder = _builder(tmp_path)
|
||||||
|
|
||||||
|
content = builder._build_user_content("hello", [str(tmp_path / "ghost.png")])
|
||||||
|
|
||||||
|
assert isinstance(content, str)
|
||||||
|
assert "[Skipped image: file not found (ghost.png)]" in content
|
||||||
|
assert content.endswith("hello")
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_user_content_skips_large_images_with_note(tmp_path: Path) -> None:
|
||||||
|
builder = _builder(tmp_path)
|
||||||
|
big = tmp_path / "big.png"
|
||||||
|
big.write_bytes(PNG_BYTES + b"x" * ContextBuilder._MAX_IMAGE_BYTES)
|
||||||
|
|
||||||
|
content = builder._build_user_content("analyze", [str(big)])
|
||||||
|
|
||||||
|
limit_mb = ContextBuilder._MAX_IMAGE_BYTES // (1024 * 1024)
|
||||||
|
assert isinstance(content, str)
|
||||||
|
assert f"[Skipped image: file too large (big.png, limit {limit_mb} MB)]" in content
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_user_content_keeps_valid_images_and_skip_notes_together(tmp_path: Path) -> None:
|
||||||
|
builder = _builder(tmp_path)
|
||||||
|
good = tmp_path / "good.png"
|
||||||
|
bad = tmp_path / "bad.txt"
|
||||||
|
good.write_bytes(PNG_BYTES)
|
||||||
|
bad.write_text("oops", encoding="utf-8")
|
||||||
|
|
||||||
|
content = builder._build_user_content("check both", [str(good), str(bad)])
|
||||||
|
|
||||||
|
assert isinstance(content, list)
|
||||||
|
assert content[0]["type"] == "image_url"
|
||||||
|
assert (
|
||||||
|
"[Skipped image: unsupported or invalid image format (bad.txt)]"
|
||||||
|
in content[-1]["text"]
|
||||||
|
)
|
||||||
|
assert content[-1]["text"].endswith("check both")
|
||||||
Loading…
x
Reference in New Issue
Block a user