"""Tests for the shared agent runner and its integration contracts.""" from __future__ import annotations from unittest.mock import AsyncMock, MagicMock, patch import pytest from nanobot.providers.base import LLMResponse, ToolCallRequest def _make_loop(tmp_path): from nanobot.agent.loop import AgentLoop from nanobot.bus.queue import MessageBus bus = MessageBus() provider = MagicMock() provider.get_default_model.return_value = "test-model" with patch("nanobot.agent.loop.ContextBuilder"), \ patch("nanobot.agent.loop.SessionManager"), \ patch("nanobot.agent.loop.SubagentManager") as MockSubMgr: MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0) loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path) return loop @pytest.mark.asyncio async def test_runner_preserves_reasoning_fields_and_tool_results(): from nanobot.agent.runner import AgentRunSpec, AgentRunner provider = MagicMock() captured_second_call: list[dict] = [] call_count = {"n": 0} async def chat_with_retry(*, messages, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: return LLMResponse( content="thinking", tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={"path": "."})], reasoning_content="hidden reasoning", thinking_blocks=[{"type": "thinking", "thinking": "step"}], usage={"prompt_tokens": 5, "completion_tokens": 3}, ) captured_second_call[:] = messages return LLMResponse(content="done", tool_calls=[], usage={}) provider.chat_with_retry = chat_with_retry tools = MagicMock() tools.get_definitions.return_value = [] tools.execute = AsyncMock(return_value="tool result") runner = AgentRunner(provider) result = await runner.run(AgentRunSpec( initial_messages=[ {"role": "system", "content": "system"}, {"role": "user", "content": "do task"}, ], tools=tools, model="test-model", max_iterations=3, )) assert result.final_content == "done" assert result.tools_used == ["list_dir"] assert result.tool_events == [ {"name": "list_dir", "status": "ok", "detail": "tool result"} ] assistant_messages = [ msg for msg in captured_second_call if msg.get("role") == "assistant" and msg.get("tool_calls") ] assert len(assistant_messages) == 1 assert assistant_messages[0]["reasoning_content"] == "hidden reasoning" assert assistant_messages[0]["thinking_blocks"] == [{"type": "thinking", "thinking": "step"}] assert any( msg.get("role") == "tool" and msg.get("content") == "tool result" for msg in captured_second_call ) @pytest.mark.asyncio async def test_runner_returns_max_iterations_fallback(): from nanobot.agent.runner import AgentRunSpec, AgentRunner provider = MagicMock() provider.chat_with_retry = AsyncMock(return_value=LLMResponse( content="still working", tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={"path": "."})], )) tools = MagicMock() tools.get_definitions.return_value = [] tools.execute = AsyncMock(return_value="tool result") runner = AgentRunner(provider) result = await runner.run(AgentRunSpec( initial_messages=[], tools=tools, model="test-model", max_iterations=2, )) assert result.stop_reason == "max_iterations" assert result.final_content == ( "I reached the maximum number of tool call iterations (2) " "without completing the task. You can try breaking the task into smaller steps." ) @pytest.mark.asyncio async def test_runner_returns_structured_tool_error(): from nanobot.agent.runner import AgentRunSpec, AgentRunner provider = MagicMock() provider.chat_with_retry = AsyncMock(return_value=LLMResponse( content="working", tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={})], )) tools = MagicMock() tools.get_definitions.return_value = [] tools.execute = AsyncMock(side_effect=RuntimeError("boom")) runner = AgentRunner(provider) result = await runner.run(AgentRunSpec( initial_messages=[], tools=tools, model="test-model", max_iterations=2, fail_on_tool_error=True, )) assert result.stop_reason == "tool_error" assert result.error == "Error: RuntimeError: boom" assert result.tool_events == [ {"name": "list_dir", "status": "error", "detail": "boom"} ] @pytest.mark.asyncio async def test_loop_max_iterations_message_stays_stable(tmp_path): loop = _make_loop(tmp_path) loop.provider.chat_with_retry = AsyncMock(return_value=LLMResponse( content="working", tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={})], )) loop.tools.get_definitions = MagicMock(return_value=[]) loop.tools.execute = AsyncMock(return_value="ok") loop.max_iterations = 2 final_content, _, _ = await loop._run_agent_loop([]) assert final_content == ( "I reached the maximum number of tool call iterations (2) " "without completing the task. You can try breaking the task into smaller steps." ) @pytest.mark.asyncio async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, monkeypatch): from nanobot.agent.subagent import SubagentManager from nanobot.bus.queue import MessageBus bus = MessageBus() provider = MagicMock() provider.get_default_model.return_value = "test-model" provider.chat_with_retry = AsyncMock(return_value=LLMResponse( content="working", tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={})], )) mgr = SubagentManager(provider=provider, workspace=tmp_path, bus=bus) mgr._announce_result = AsyncMock() async def fake_execute(self, name, arguments): return "tool result" monkeypatch.setattr("nanobot.agent.tools.registry.ToolRegistry.execute", fake_execute) await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) mgr._announce_result.assert_awaited_once() args = mgr._announce_result.await_args.args assert args[3] == "Task completed but no final response was generated." assert args[5] == "ok"