"""Tests for AgentRunner tool execution: batching, concurrency, exclusive tools.""" from __future__ import annotations import asyncio from unittest.mock import AsyncMock, MagicMock import pytest from nanobot.agent.tools.base import Tool from nanobot.agent.tools.registry import ToolRegistry from nanobot.config.schema import AgentDefaults from nanobot.providers.base import LLMResponse, ToolCallRequest _MAX_TOOL_RESULT_CHARS = AgentDefaults().max_tool_result_chars class _DelayTool(Tool): def __init__( self, name: str, *, delay: float, read_only: bool, shared_events: list[str], exclusive: bool = False, ): self._name = name self._delay = delay self._read_only = read_only self._shared_events = shared_events self._exclusive = exclusive @property def name(self) -> str: return self._name @property def description(self) -> str: return self._name @property def parameters(self) -> dict: return {"type": "object", "properties": {}, "required": []} @property def read_only(self) -> bool: return self._read_only @property def exclusive(self) -> bool: return self._exclusive async def execute(self, **kwargs): self._shared_events.append(f"start:{self._name}") await asyncio.sleep(self._delay) self._shared_events.append(f"end:{self._name}") return self._name @pytest.mark.asyncio async def test_runner_batches_read_only_tools_before_exclusive_work(): from nanobot.agent.runner import AgentRunSpec, AgentRunner tools = ToolRegistry() shared_events: list[str] = [] read_a = _DelayTool("read_a", delay=0.05, read_only=True, shared_events=shared_events) read_b = _DelayTool("read_b", delay=0.05, read_only=True, shared_events=shared_events) write_a = _DelayTool("write_a", delay=0.01, read_only=False, shared_events=shared_events) tools.register(read_a) tools.register(read_b) tools.register(write_a) runner = AgentRunner(MagicMock()) await runner._execute_tools( AgentRunSpec( initial_messages=[], tools=tools, model="test-model", max_iterations=1, max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, concurrent_tools=True, ), [ ToolCallRequest(id="ro1", name="read_a", arguments={}), ToolCallRequest(id="ro2", name="read_b", arguments={}), ToolCallRequest(id="rw1", name="write_a", arguments={}), ], {}, {}, ) assert shared_events[0:2] == ["start:read_a", "start:read_b"] assert "end:read_a" in shared_events and "end:read_b" in shared_events assert shared_events.index("end:read_a") < shared_events.index("start:write_a") assert shared_events.index("end:read_b") < shared_events.index("start:write_a") assert shared_events[-2:] == ["start:write_a", "end:write_a"] @pytest.mark.asyncio async def test_runner_does_not_batch_exclusive_read_only_tools(): from nanobot.agent.runner import AgentRunSpec, AgentRunner tools = ToolRegistry() shared_events: list[str] = [] read_a = _DelayTool("read_a", delay=0.03, read_only=True, shared_events=shared_events) read_b = _DelayTool("read_b", delay=0.03, read_only=True, shared_events=shared_events) ddg_like = _DelayTool( "ddg_like", delay=0.01, read_only=True, shared_events=shared_events, exclusive=True, ) tools.register(read_a) tools.register(ddg_like) tools.register(read_b) runner = AgentRunner(MagicMock()) await runner._execute_tools( AgentRunSpec( initial_messages=[], tools=tools, model="test-model", max_iterations=1, max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, concurrent_tools=True, ), [ ToolCallRequest(id="ro1", name="read_a", arguments={}), ToolCallRequest(id="ddg1", name="ddg_like", arguments={}), ToolCallRequest(id="ro2", name="read_b", arguments={}), ], {}, {}, ) assert shared_events[0] == "start:read_a" assert shared_events.index("end:read_a") < shared_events.index("start:ddg_like") assert shared_events.index("end:ddg_like") < shared_events.index("start:read_b") @pytest.mark.asyncio async def test_runner_blocks_repeated_external_fetches(): from nanobot.agent.runner import AgentRunSpec, AgentRunner provider = MagicMock() captured_final_call: list[dict] = [] call_count = {"n": 0} async def chat_with_retry(*, messages, **kwargs): call_count["n"] += 1 if call_count["n"] <= 3: return LLMResponse( content="working", tool_calls=[ToolCallRequest(id=f"call_{call_count['n']}", name="web_fetch", arguments={"url": "https://example.com"})], usage={}, ) captured_final_call[:] = messages return LLMResponse(content="done", tool_calls=[], usage={}) provider.chat_with_retry = chat_with_retry tools = MagicMock() tools.get_definitions.return_value = [] tools.execute = AsyncMock(return_value="page content") runner = AgentRunner(provider) result = await runner.run(AgentRunSpec( initial_messages=[{"role": "user", "content": "research task"}], tools=tools, model="test-model", max_iterations=4, max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, )) assert result.final_content == "done" assert tools.execute.await_count == 2 blocked_tool_message = [ msg for msg in captured_final_call if msg.get("role") == "tool" and msg.get("tool_call_id") == "call_3" ][0] assert "repeated external lookup blocked" in blocked_tool_message["content"]