nanobot/tests/test_nanobot_facade.py
Mohamed Elkholy ca7877f272 fix(sdk): populate RunResult.tools_used and RunResult.messages
``Nanobot.run()`` has always documented ``RunResult.tools_used`` and
``RunResult.messages`` but actually returned ``[]`` for both, so SDK
consumers could never inspect which tools fired or what the final
message list looked like — the only useful field was ``content``.

This threads the data out via a tiny ``_SDKCaptureHook`` that installs
alongside any user-supplied hooks. The capture hook accumulates tool
names across iterations and snapshots the message list on each
``after_iteration`` call; the last snapshot reflects end-of-turn state.

Only the SDK facade is touched: ``AgentLoop.process_direct`` and
``AgentRunner`` signatures are unchanged, so channels / CLI / API paths
are unaffected.
2026-05-05 23:23:29 +08:00

302 lines
10 KiB
Python

"""Tests for the Nanobot programmatic facade."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from nanobot.nanobot import Nanobot, RunResult
def _write_config(tmp_path: Path, overrides: dict | None = None) -> Path:
data = {
"providers": {"openrouter": {"apiKey": "sk-test-key"}},
"agents": {"defaults": {"model": "openai/gpt-4.1"}},
}
if overrides:
data.update(overrides)
config_path = tmp_path / "config.json"
config_path.write_text(json.dumps(data))
return config_path
def test_from_config_missing_file():
with pytest.raises(FileNotFoundError):
Nanobot.from_config("/nonexistent/config.json")
def test_from_config_creates_instance(tmp_path):
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
assert bot._loop is not None
assert bot._loop.workspace == tmp_path
def test_from_config_default_path():
from nanobot.config.schema import Config
with patch("nanobot.config.loader.load_config") as mock_load, \
patch("nanobot.nanobot._make_provider") as mock_prov:
mock_load.return_value = Config()
mock_prov.return_value = MagicMock()
mock_prov.return_value.get_default_model.return_value = "test"
mock_prov.return_value.generation.max_tokens = 4096
Nanobot.from_config()
mock_load.assert_called_once_with(None)
@pytest.mark.asyncio
async def test_run_returns_result(tmp_path):
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
from nanobot.bus.events import OutboundMessage
mock_response = OutboundMessage(
channel="cli", chat_id="direct", content="Hello back!"
)
bot._loop.process_direct = AsyncMock(return_value=mock_response)
result = await bot.run("hi")
assert isinstance(result, RunResult)
assert result.content == "Hello back!"
bot._loop.process_direct.assert_awaited_once_with("hi", session_key="sdk:default")
@pytest.mark.asyncio
async def test_run_with_hooks(tmp_path):
from nanobot.agent.hook import AgentHook, AgentHookContext
from nanobot.bus.events import OutboundMessage
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
class TestHook(AgentHook):
async def before_iteration(self, context: AgentHookContext) -> None:
pass
mock_response = OutboundMessage(
channel="cli", chat_id="direct", content="done"
)
bot._loop.process_direct = AsyncMock(return_value=mock_response)
result = await bot.run("hi", hooks=[TestHook()])
assert result.content == "done"
assert bot._loop._extra_hooks == []
@pytest.mark.asyncio
async def test_run_hooks_restored_on_error(tmp_path):
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
from nanobot.agent.hook import AgentHook
bot._loop.process_direct = AsyncMock(side_effect=RuntimeError("boom"))
original_hooks = bot._loop._extra_hooks
with pytest.raises(RuntimeError):
await bot.run("hi", hooks=[AgentHook()])
assert bot._loop._extra_hooks is original_hooks
@pytest.mark.asyncio
async def test_run_none_response(tmp_path):
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
bot._loop.process_direct = AsyncMock(return_value=None)
result = await bot.run("hi")
assert result.content == ""
def test_workspace_override(tmp_path):
config_path = _write_config(tmp_path)
custom_ws = tmp_path / "custom_workspace"
custom_ws.mkdir()
bot = Nanobot.from_config(config_path, workspace=custom_ws)
assert bot._loop.workspace == custom_ws
def test_sdk_make_provider_uses_github_copilot_backend():
from nanobot.config.schema import Config
from nanobot.nanobot import _make_provider
config = Config.model_validate(
{
"agents": {
"defaults": {
"provider": "github-copilot",
"model": "github-copilot/gpt-4.1",
}
}
}
)
with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"):
provider = _make_provider(config)
assert provider.__class__.__name__ == "GitHubCopilotProvider"
@pytest.mark.asyncio
async def test_run_custom_session_key(tmp_path):
from nanobot.bus.events import OutboundMessage
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
mock_response = OutboundMessage(
channel="cli", chat_id="direct", content="ok"
)
bot._loop.process_direct = AsyncMock(return_value=mock_response)
await bot.run("hi", session_key="user-alice")
bot._loop.process_direct.assert_awaited_once_with("hi", session_key="user-alice")
def test_import_from_top_level():
import nanobot
assert nanobot.Nanobot is Nanobot
assert nanobot.RunResult is RunResult
# ---------------------------------------------------------------------------
# RunResult.tools_used / messages — populated from the agent iterations
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_run_populates_tools_used_across_iterations(tmp_path):
"""tools_used collects every tool name fired across all iterations, in order."""
from nanobot.agent.hook import AgentHookContext
from nanobot.bus.events import OutboundMessage
from nanobot.providers.base import ToolCallRequest
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
async def fake_process_direct(message, *, session_key):
# Whatever hooks the SDK installed are now on the loop.
extras = bot._loop._extra_hooks
messages = [{"role": "user", "content": message}]
ctx1 = AgentHookContext(iteration=0, messages=messages)
ctx1.tool_calls = [
ToolCallRequest(id="c1", name="read_file", arguments={}),
ToolCallRequest(id="c2", name="glob", arguments={}),
]
for h in extras:
await h.after_iteration(ctx1)
messages.append({"role": "assistant", "content": "ok"})
ctx2 = AgentHookContext(iteration=1, messages=messages)
ctx2.tool_calls = [ToolCallRequest(id="c3", name="web_fetch", arguments={})]
for h in extras:
await h.after_iteration(ctx2)
return OutboundMessage(channel="cli", chat_id="direct", content="final")
bot._loop.process_direct = fake_process_direct
result = await bot.run("do stuff")
assert result.content == "final"
assert result.tools_used == ["read_file", "glob", "web_fetch"]
@pytest.mark.asyncio
async def test_run_populates_final_messages(tmp_path):
"""messages reflects the agent's message list at the last iteration."""
from nanobot.agent.hook import AgentHookContext
from nanobot.bus.events import OutboundMessage
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
async def fake_process_direct(message, *, session_key):
extras = bot._loop._extra_hooks
messages = [
{"role": "user", "content": message},
{"role": "assistant", "content": "hi there"},
]
ctx = AgentHookContext(iteration=0, messages=messages)
for h in extras:
await h.after_iteration(ctx)
return OutboundMessage(channel="cli", chat_id="direct", content="hi there")
bot._loop.process_direct = fake_process_direct
result = await bot.run("hello")
assert result.messages == [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
]
@pytest.mark.asyncio
async def test_run_no_iterations_leaves_defaults_empty(tmp_path):
"""If process_direct never triggers after_iteration, tools_used/messages stay []."""
from nanobot.bus.events import OutboundMessage
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
bot._loop.process_direct = AsyncMock(
return_value=OutboundMessage(channel="cli", chat_id="direct", content="noop"),
)
result = await bot.run("hi")
assert result.tools_used == []
assert result.messages == []
@pytest.mark.asyncio
async def test_run_user_hooks_still_fire_alongside_capture(tmp_path):
"""Capture hook must not displace user-provided hooks."""
from nanobot.agent.hook import AgentHook, AgentHookContext
from nanobot.bus.events import OutboundMessage
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
seen_iterations: list[int] = []
class UserHook(AgentHook):
async def after_iteration(self, context: AgentHookContext) -> None:
seen_iterations.append(context.iteration)
async def fake_process_direct(message, *, session_key):
extras = bot._loop._extra_hooks
assert len(extras) == 2, f"expected capture + user hook, got {len(extras)}"
ctx = AgentHookContext(iteration=7, messages=[])
for h in extras:
await h.after_iteration(ctx)
return OutboundMessage(channel="cli", chat_id="direct", content="ok")
bot._loop.process_direct = fake_process_direct
await bot.run("x", hooks=[UserHook()])
assert seen_iterations == [7]
@pytest.mark.asyncio
async def test_run_restores_extra_hooks_even_on_populated_iterations(tmp_path):
"""Previously-installed _extra_hooks must be restored regardless of capture state."""
from nanobot.agent.hook import AgentHook, AgentHookContext
from nanobot.bus.events import OutboundMessage
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
sentinel_hook = AgentHook()
bot._loop._extra_hooks = [sentinel_hook]
async def fake_process_direct(message, *, session_key):
ctx = AgentHookContext(iteration=0, messages=[])
for h in bot._loop._extra_hooks:
await h.after_iteration(ctx)
return OutboundMessage(channel="cli", chat_id="direct", content="done")
bot._loop.process_direct = fake_process_direct
await bot.run("hello")
assert bot._loop._extra_hooks == [sentinel_hook]