"""Tests for ContextBuilder — system prompt and message assembly.""" from pathlib import Path import pytest from nanobot.agent.context import ContextBuilder from nanobot.session.goal_state import GOAL_STATE_KEY # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _builder(tmp_path: Path, **kw) -> ContextBuilder: return ContextBuilder(workspace=tmp_path, **kw) # --------------------------------------------------------------------------- # _build_runtime_context (static) # --------------------------------------------------------------------------- class TestBuildRuntimeContext: def test_time_only(self): ctx = ContextBuilder._build_runtime_context(None, None) assert "[Runtime Context" in ctx assert "[/Runtime Context]" in ctx assert "Current Time:" in ctx assert "Channel:" not in ctx def test_with_channel_and_chat_id(self): ctx = ContextBuilder._build_runtime_context("telegram", "chat123") assert "Channel: telegram" in ctx assert "Chat ID: chat123" in ctx def test_with_sender_id(self): ctx = ContextBuilder._build_runtime_context("cli", "direct", sender_id="user1") assert "Sender ID: user1" in ctx def test_with_timezone(self): ctx = ContextBuilder._build_runtime_context(None, None, timezone="Asia/Shanghai") assert "Current Time:" in ctx def test_no_channel_no_chat_id_omits_both(self): ctx = ContextBuilder._build_runtime_context(None, None) assert "Channel:" not in ctx assert "Chat ID:" not in ctx def test_no_sender_id_omits(self): ctx = ContextBuilder._build_runtime_context("cli", "direct") assert "Sender ID:" not in ctx # --------------------------------------------------------------------------- # _merge_message_content (static) # --------------------------------------------------------------------------- class TestMergeMessageContent: def test_str_plus_str(self): result = ContextBuilder._merge_message_content("hello", "world") assert result == "hello\n\nworld" def test_empty_left_plus_str(self): result = ContextBuilder._merge_message_content("", "world") assert result == "world" def test_list_plus_list(self): left = [{"type": "text", "text": "a"}] right = [{"type": "text", "text": "b"}] result = ContextBuilder._merge_message_content(left, right) assert len(result) == 2 assert result[0]["text"] == "a" assert result[1]["text"] == "b" def test_str_plus_list(self): right = [{"type": "text", "text": "b"}] result = ContextBuilder._merge_message_content("hello", right) assert len(result) == 2 assert result[0]["text"] == "hello" assert result[1]["text"] == "b" def test_list_plus_str(self): left = [{"type": "text", "text": "a"}] result = ContextBuilder._merge_message_content(left, "world") assert len(result) == 2 assert result[0]["text"] == "a" assert result[1]["text"] == "world" def test_none_plus_str(self): result = ContextBuilder._merge_message_content(None, "hello") assert result == [{"type": "text", "text": "hello"}] def test_str_plus_none(self): result = ContextBuilder._merge_message_content("hello", None) assert result == [{"type": "text", "text": "hello"}] def test_none_plus_none(self): result = ContextBuilder._merge_message_content(None, None) assert result == [] def test_list_items_not_dicts_wrapped(self): result = ContextBuilder._merge_message_content(["raw_item"], None) assert result == [{"type": "text", "text": "raw_item"}] # --------------------------------------------------------------------------- # _load_bootstrap_files # --------------------------------------------------------------------------- class TestLoadBootstrapFiles: def test_no_bootstrap_files(self, tmp_path): builder = _builder(tmp_path) assert builder._load_bootstrap_files() == "" def test_agents_md(self, tmp_path): (tmp_path / "AGENTS.md").write_text("Be helpful.", encoding="utf-8") builder = _builder(tmp_path) result = builder._load_bootstrap_files() assert "## AGENTS.md" in result assert "Be helpful." in result def test_multiple_bootstrap_files(self, tmp_path): (tmp_path / "AGENTS.md").write_text("Rules.", encoding="utf-8") (tmp_path / "SOUL.md").write_text("Soul.", encoding="utf-8") builder = _builder(tmp_path) result = builder._load_bootstrap_files() assert "## AGENTS.md" in result assert "## SOUL.md" in result assert "Rules." in result assert "Soul." in result def test_all_bootstrap_files(self, tmp_path): for name in ContextBuilder.BOOTSTRAP_FILES: (tmp_path / name).write_text(f"Content of {name}", encoding="utf-8") builder = _builder(tmp_path) result = builder._load_bootstrap_files() for name in ContextBuilder.BOOTSTRAP_FILES: assert f"## {name}" in result def test_utf8_content(self, tmp_path): (tmp_path / "AGENTS.md").write_text("用中文回复", encoding="utf-8") builder = _builder(tmp_path) result = builder._load_bootstrap_files() assert "用中文回复" in result # --------------------------------------------------------------------------- # _is_template_content (static) # --------------------------------------------------------------------------- class TestIsTemplateContent: def test_nonexistent_template_returns_false(self): assert ContextBuilder._is_template_content("anything", "nonexistent/path.md") is False def test_content_matching_template(self): from importlib.resources import files as pkg_files tpl = pkg_files("nanobot") / "templates" / "memory" / "MEMORY.md" if not tpl.is_file(): pytest.skip("MEMORY.md template not bundled") original = tpl.read_text(encoding="utf-8") assert ContextBuilder._is_template_content(original, "memory/MEMORY.md") is True def test_modified_content_returns_false(self): from importlib.resources import files as pkg_files tpl = pkg_files("nanobot") / "templates" / "memory" / "MEMORY.md" if not tpl.is_file(): pytest.skip("MEMORY.md template not bundled") assert ContextBuilder._is_template_content("totally different", "memory/MEMORY.md") is False # --------------------------------------------------------------------------- # _build_user_content # --------------------------------------------------------------------------- class TestBuildUserContent: def test_no_media_returns_string(self, tmp_path): builder = _builder(tmp_path) result = builder._build_user_content("hello", None) assert result == "hello" def test_empty_media_returns_string(self, tmp_path): builder = _builder(tmp_path) result = builder._build_user_content("hello", []) assert result == "hello" def test_nonexistent_media_file_returns_string(self, tmp_path): builder = _builder(tmp_path) result = builder._build_user_content("hello", ["/nonexistent/image.png"]) assert result == "hello" def test_non_image_file_returns_string(self, tmp_path): txt = tmp_path / "doc.txt" txt.write_text("not an image", encoding="utf-8") builder = _builder(tmp_path) result = builder._build_user_content("hello", [str(txt)]) assert result == "hello" def test_valid_image_returns_list(self, tmp_path): png = tmp_path / "test.png" png.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 16) builder = _builder(tmp_path) result = builder._build_user_content("hello", [str(png)]) assert isinstance(result, list) assert len(result) == 2 assert result[0]["type"] == "image_url" assert result[0]["image_url"]["url"].startswith("data:image/png;base64,") assert result[1]["type"] == "text" assert result[1]["text"] == "hello" def test_image_meta_includes_path(self, tmp_path): png = tmp_path / "test.png" png.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 16) builder = _builder(tmp_path) result = builder._build_user_content("hello", [str(png)]) assert "_meta" in result[0] assert "path" in result[0]["_meta"] # --------------------------------------------------------------------------- # build_system_prompt # --------------------------------------------------------------------------- class TestBuildSystemPrompt: def test_returns_nonempty_string(self, tmp_path): builder = _builder(tmp_path) result = builder.build_system_prompt() assert isinstance(result, str) assert len(result) > 0 def test_includes_identity_section(self, tmp_path): builder = _builder(tmp_path) result = builder.build_system_prompt() assert "workspace" in result.lower() or "python" in result.lower() def test_includes_bootstrap_files(self, tmp_path): (tmp_path / "AGENTS.md").write_text("Be helpful and concise.", encoding="utf-8") builder = _builder(tmp_path) result = builder.build_system_prompt() assert "Be helpful and concise." in result def test_includes_session_summary(self, tmp_path): builder = _builder(tmp_path) result = builder.build_system_prompt(session_summary="Previous chat about Python.") assert "Previous chat about Python." in result assert "[Archived Context Summary]" in result def test_sections_separated_by_separator(self, tmp_path): (tmp_path / "AGENTS.md").write_text("Rules.", encoding="utf-8") builder = _builder(tmp_path) result = builder.build_system_prompt(session_summary="Summary.") assert "\n\n---\n\n" in result def test_no_bootstrap_no_summary(self, tmp_path): builder = _builder(tmp_path) result = builder.build_system_prompt() assert "## AGENTS.md" not in result assert "[Archived Context Summary]" not in result # --------------------------------------------------------------------------- # build_messages # --------------------------------------------------------------------------- class TestBuildMessages: def test_basic_empty_history(self, tmp_path): builder = _builder(tmp_path) messages = builder.build_messages([], "hello") assert len(messages) == 2 assert messages[0]["role"] == "system" assert messages[1]["role"] == "user" assert "hello" in str(messages[1]["content"]) def test_runtime_context_injected(self, tmp_path): builder = _builder(tmp_path) messages = builder.build_messages([], "hello", channel="cli", chat_id="direct") user_msg = str(messages[-1]["content"]) assert "[Runtime Context" in user_msg assert "hello" in user_msg def test_session_metadata_injects_active_goal_state(self, tmp_path): builder = _builder(tmp_path) meta = { GOAL_STATE_KEY: {"status": "active", "objective": "Finish docs migration."}, } messages = builder.build_messages( [], "hi", channel="cli", chat_id="x", session_metadata=meta, ) user_msg = str(messages[-1]["content"]) assert "Goal (active):" in user_msg assert "Finish docs migration." in user_msg def test_consecutive_same_role_merged(self, tmp_path): builder = _builder(tmp_path) history = [{"role": "user", "content": "previous user message"}] messages = builder.build_messages(history, "new message") assert len(messages) == 2 # system + merged user assert "previous user message" in str(messages[1]["content"]) assert "new message" in str(messages[1]["content"]) def test_different_role_appended(self, tmp_path): builder = _builder(tmp_path) history = [{"role": "assistant", "content": "previous response"}] messages = builder.build_messages(history, "new message") assert len(messages) == 3 # system + assistant + user def test_media_with_history(self, tmp_path): png = tmp_path / "img.png" png.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 16) builder = _builder(tmp_path) history = [{"role": "assistant", "content": "see this"}] messages = builder.build_messages(history, "check image", media=[str(png)]) user_msg = messages[-1]["content"] assert isinstance(user_msg, list) assert any(b.get("type") == "image_url" for b in user_msg)