mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-19 16:12:30 +00:00
- Add 42 tests for ContextBuilder (context.py: 0→42 tests) - Add 37 tests for SubagentManager lifecycle (subagent.py: 2→37 tests) - Add 42 unit tests for AutoCompact in isolation - Split monolithic test_runner.py (3313 lines) into 9 focused files: test_runner_core, test_runner_hooks, test_runner_errors, test_runner_safety, test_runner_persistence, test_runner_governance, test_runner_tool_execution, test_runner_injections, test_loop_runner_integration - Add 3 config passthrough tests (temperature/max_tokens/reasoning_effort) - Fix fragile patch.object(__init__) in test_stop_preserves_context - Create shared conftest.py with make_provider/make_loop factories Total: 934 tests passing, 0 regressions
334 lines
13 KiB
Python
334 lines
13 KiB
Python
"""Tests for ContextBuilder — system prompt and message assembly."""
|
|
|
|
import base64
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.agent.context import ContextBuilder
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _builder(tmp_path: Path, **kw) -> ContextBuilder:
|
|
return ContextBuilder(workspace=tmp_path, **kw)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _build_runtime_context (static)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBuildRuntimeContext:
|
|
def test_time_only(self):
|
|
ctx = ContextBuilder._build_runtime_context(None, None)
|
|
assert "[Runtime Context" in ctx
|
|
assert "[/Runtime Context]" in ctx
|
|
assert "Current Time:" in ctx
|
|
assert "Channel:" not in ctx
|
|
|
|
def test_with_channel_and_chat_id(self):
|
|
ctx = ContextBuilder._build_runtime_context("telegram", "chat123")
|
|
assert "Channel: telegram" in ctx
|
|
assert "Chat ID: chat123" in ctx
|
|
|
|
def test_with_sender_id(self):
|
|
ctx = ContextBuilder._build_runtime_context("cli", "direct", sender_id="user1")
|
|
assert "Sender ID: user1" in ctx
|
|
|
|
def test_with_timezone(self):
|
|
ctx = ContextBuilder._build_runtime_context(None, None, timezone="Asia/Shanghai")
|
|
assert "Current Time:" in ctx
|
|
|
|
def test_no_channel_no_chat_id_omits_both(self):
|
|
ctx = ContextBuilder._build_runtime_context(None, None)
|
|
assert "Channel:" not in ctx
|
|
assert "Chat ID:" not in ctx
|
|
|
|
def test_no_sender_id_omits(self):
|
|
ctx = ContextBuilder._build_runtime_context("cli", "direct")
|
|
assert "Sender ID:" not in ctx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _merge_message_content (static)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMergeMessageContent:
|
|
def test_str_plus_str(self):
|
|
result = ContextBuilder._merge_message_content("hello", "world")
|
|
assert result == "hello\n\nworld"
|
|
|
|
def test_empty_left_plus_str(self):
|
|
result = ContextBuilder._merge_message_content("", "world")
|
|
assert result == "world"
|
|
|
|
def test_list_plus_list(self):
|
|
left = [{"type": "text", "text": "a"}]
|
|
right = [{"type": "text", "text": "b"}]
|
|
result = ContextBuilder._merge_message_content(left, right)
|
|
assert len(result) == 2
|
|
assert result[0]["text"] == "a"
|
|
assert result[1]["text"] == "b"
|
|
|
|
def test_str_plus_list(self):
|
|
right = [{"type": "text", "text": "b"}]
|
|
result = ContextBuilder._merge_message_content("hello", right)
|
|
assert len(result) == 2
|
|
assert result[0]["text"] == "hello"
|
|
assert result[1]["text"] == "b"
|
|
|
|
def test_list_plus_str(self):
|
|
left = [{"type": "text", "text": "a"}]
|
|
result = ContextBuilder._merge_message_content(left, "world")
|
|
assert len(result) == 2
|
|
assert result[0]["text"] == "a"
|
|
assert result[1]["text"] == "world"
|
|
|
|
def test_none_plus_str(self):
|
|
result = ContextBuilder._merge_message_content(None, "hello")
|
|
assert result == [{"type": "text", "text": "hello"}]
|
|
|
|
def test_str_plus_none(self):
|
|
result = ContextBuilder._merge_message_content("hello", None)
|
|
assert result == [{"type": "text", "text": "hello"}]
|
|
|
|
def test_none_plus_none(self):
|
|
result = ContextBuilder._merge_message_content(None, None)
|
|
assert result == []
|
|
|
|
def test_list_items_not_dicts_wrapped(self):
|
|
result = ContextBuilder._merge_message_content(["raw_item"], None)
|
|
assert result == [{"type": "text", "text": "raw_item"}]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _load_bootstrap_files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestLoadBootstrapFiles:
|
|
def test_no_bootstrap_files(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
assert builder._load_bootstrap_files() == ""
|
|
|
|
def test_agents_md(self, tmp_path):
|
|
(tmp_path / "AGENTS.md").write_text("Be helpful.", encoding="utf-8")
|
|
builder = _builder(tmp_path)
|
|
result = builder._load_bootstrap_files()
|
|
assert "## AGENTS.md" in result
|
|
assert "Be helpful." in result
|
|
|
|
def test_multiple_bootstrap_files(self, tmp_path):
|
|
(tmp_path / "AGENTS.md").write_text("Rules.", encoding="utf-8")
|
|
(tmp_path / "SOUL.md").write_text("Soul.", encoding="utf-8")
|
|
builder = _builder(tmp_path)
|
|
result = builder._load_bootstrap_files()
|
|
assert "## AGENTS.md" in result
|
|
assert "## SOUL.md" in result
|
|
assert "Rules." in result
|
|
assert "Soul." in result
|
|
|
|
def test_all_bootstrap_files(self, tmp_path):
|
|
for name in ContextBuilder.BOOTSTRAP_FILES:
|
|
(tmp_path / name).write_text(f"Content of {name}", encoding="utf-8")
|
|
builder = _builder(tmp_path)
|
|
result = builder._load_bootstrap_files()
|
|
for name in ContextBuilder.BOOTSTRAP_FILES:
|
|
assert f"## {name}" in result
|
|
|
|
def test_utf8_content(self, tmp_path):
|
|
(tmp_path / "AGENTS.md").write_text("用中文回复", encoding="utf-8")
|
|
builder = _builder(tmp_path)
|
|
result = builder._load_bootstrap_files()
|
|
assert "用中文回复" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _is_template_content (static)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsTemplateContent:
|
|
def test_nonexistent_template_returns_false(self):
|
|
assert ContextBuilder._is_template_content("anything", "nonexistent/path.md") is False
|
|
|
|
def test_content_matching_template(self):
|
|
from importlib.resources import files as pkg_files
|
|
tpl = pkg_files("nanobot") / "templates" / "memory" / "MEMORY.md"
|
|
if not tpl.is_file():
|
|
pytest.skip("MEMORY.md template not bundled")
|
|
original = tpl.read_text(encoding="utf-8")
|
|
assert ContextBuilder._is_template_content(original, "memory/MEMORY.md") is True
|
|
|
|
def test_modified_content_returns_false(self):
|
|
from importlib.resources import files as pkg_files
|
|
tpl = pkg_files("nanobot") / "templates" / "memory" / "MEMORY.md"
|
|
if not tpl.is_file():
|
|
pytest.skip("MEMORY.md template not bundled")
|
|
assert ContextBuilder._is_template_content("totally different", "memory/MEMORY.md") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _build_user_content
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBuildUserContent:
|
|
def test_no_media_returns_string(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
result = builder._build_user_content("hello", None)
|
|
assert result == "hello"
|
|
|
|
def test_empty_media_returns_string(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
result = builder._build_user_content("hello", [])
|
|
assert result == "hello"
|
|
|
|
def test_nonexistent_media_file_returns_string(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
result = builder._build_user_content("hello", ["/nonexistent/image.png"])
|
|
assert result == "hello"
|
|
|
|
def test_non_image_file_returns_string(self, tmp_path):
|
|
txt = tmp_path / "doc.txt"
|
|
txt.write_text("not an image", encoding="utf-8")
|
|
builder = _builder(tmp_path)
|
|
result = builder._build_user_content("hello", [str(txt)])
|
|
assert result == "hello"
|
|
|
|
def test_valid_image_returns_list(self, tmp_path):
|
|
png = tmp_path / "test.png"
|
|
png.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 16)
|
|
builder = _builder(tmp_path)
|
|
result = builder._build_user_content("hello", [str(png)])
|
|
assert isinstance(result, list)
|
|
assert len(result) == 2
|
|
assert result[0]["type"] == "image_url"
|
|
assert result[0]["image_url"]["url"].startswith("data:image/png;base64,")
|
|
assert result[1]["type"] == "text"
|
|
assert result[1]["text"] == "hello"
|
|
|
|
def test_image_meta_includes_path(self, tmp_path):
|
|
png = tmp_path / "test.png"
|
|
png.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 16)
|
|
builder = _builder(tmp_path)
|
|
result = builder._build_user_content("hello", [str(png)])
|
|
assert "_meta" in result[0]
|
|
assert "path" in result[0]["_meta"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# build_system_prompt
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBuildSystemPrompt:
|
|
def test_returns_nonempty_string(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
result = builder.build_system_prompt()
|
|
assert isinstance(result, str)
|
|
assert len(result) > 0
|
|
|
|
def test_includes_identity_section(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
result = builder.build_system_prompt()
|
|
assert "workspace" in result.lower() or "python" in result.lower()
|
|
|
|
def test_includes_bootstrap_files(self, tmp_path):
|
|
(tmp_path / "AGENTS.md").write_text("Be helpful and concise.", encoding="utf-8")
|
|
builder = _builder(tmp_path)
|
|
result = builder.build_system_prompt()
|
|
assert "Be helpful and concise." in result
|
|
|
|
def test_includes_session_summary(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
result = builder.build_system_prompt(session_summary="Previous chat about Python.")
|
|
assert "Previous chat about Python." in result
|
|
assert "[Archived Context Summary]" in result
|
|
|
|
def test_sections_separated_by_separator(self, tmp_path):
|
|
(tmp_path / "AGENTS.md").write_text("Rules.", encoding="utf-8")
|
|
builder = _builder(tmp_path)
|
|
result = builder.build_system_prompt(session_summary="Summary.")
|
|
assert "\n\n---\n\n" in result
|
|
|
|
def test_no_bootstrap_no_summary(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
result = builder.build_system_prompt()
|
|
assert "## AGENTS.md" not in result
|
|
assert "[Archived Context Summary]" not in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# build_messages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBuildMessages:
|
|
def test_basic_empty_history(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
messages = builder.build_messages([], "hello")
|
|
assert len(messages) == 2
|
|
assert messages[0]["role"] == "system"
|
|
assert messages[1]["role"] == "user"
|
|
assert "hello" in str(messages[1]["content"])
|
|
|
|
def test_runtime_context_injected(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
messages = builder.build_messages([], "hello", channel="cli", chat_id="direct")
|
|
user_msg = str(messages[-1]["content"])
|
|
assert "[Runtime Context" in user_msg
|
|
assert "hello" in user_msg
|
|
|
|
def test_consecutive_same_role_merged(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
history = [{"role": "user", "content": "previous user message"}]
|
|
messages = builder.build_messages(history, "new message")
|
|
assert len(messages) == 2 # system + merged user
|
|
assert "previous user message" in str(messages[1]["content"])
|
|
assert "new message" in str(messages[1]["content"])
|
|
|
|
def test_different_role_appended(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
history = [{"role": "assistant", "content": "previous response"}]
|
|
messages = builder.build_messages(history, "new message")
|
|
assert len(messages) == 3 # system + assistant + user
|
|
|
|
def test_media_with_history(self, tmp_path):
|
|
png = tmp_path / "img.png"
|
|
png.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 16)
|
|
builder = _builder(tmp_path)
|
|
history = [{"role": "assistant", "content": "see this"}]
|
|
messages = builder.build_messages(history, "check image", media=[str(png)])
|
|
user_msg = messages[-1]["content"]
|
|
assert isinstance(user_msg, list)
|
|
assert any(b.get("type") == "image_url" for b in user_msg)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# add_tool_result
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAddToolResult:
|
|
def test_appends_tool_message(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
msgs = [{"role": "user", "content": "hello"}]
|
|
result = builder.add_tool_result(msgs, "call_123", "read_file", "file content")
|
|
assert len(result) == 2
|
|
assert result[1]["role"] == "tool"
|
|
assert result[1]["tool_call_id"] == "call_123"
|
|
assert result[1]["name"] == "read_file"
|
|
assert result[1]["content"] == "file content"
|
|
|
|
def test_returns_same_list(self, tmp_path):
|
|
builder = _builder(tmp_path)
|
|
msgs = []
|
|
result = builder.add_tool_result(msgs, "id", "tool", "ok")
|
|
assert result is msgs
|