diff --git a/docs/configuration.md b/docs/configuration.md index f1bb56a99..d0a7fe940 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1011,9 +1011,10 @@ MCP tools are automatically discovered and registered on startup. The LLM can us ## Subagent Concurrency -By default, nanobot runs one spawned subagent at a time. This protects local LLM -servers from loading multiple KV caches at once. If your provider can handle more -parallel work, raise the limit: +By default, nanobot only allows one spawned subagent at a time. When the limit is +reached, the `spawn` tool returns an error so the agent can decide to wait or +rearrange its work. This protects local LLM servers from loading multiple KV caches +at once. If your provider can handle more parallel work, raise the limit: ```json { @@ -1027,7 +1028,7 @@ parallel work, raise the limit: | Option | Default | Description | |--------|---------|-------------| -| `agents.defaults.maxConcurrentSubagents` | `1` | Maximum number of spawned subagents that may execute at the same time. Extra subagents stay queued until a slot is free. | +| `agents.defaults.maxConcurrentSubagents` | `1` | Maximum number of spawned subagents that may run at the same time. Attempts to spawn beyond this limit return an error. | ## Auto Compact diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 72d8adc0f..1a8042fed 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -189,7 +189,6 @@ class AgentLoop: workspace: Path, model: str | None = None, max_iterations: int | None = None, - max_concurrent_subagents: int | None = None, context_window_tokens: int | None = None, context_block_limit: int | None = None, max_tool_result_chars: int | None = None, @@ -226,11 +225,6 @@ class AgentLoop: self.max_iterations = ( max_iterations if max_iterations is not None else defaults.max_tool_iterations ) - self.max_concurrent_subagents = ( - max_concurrent_subagents - if max_concurrent_subagents is not None - else defaults.max_concurrent_subagents - ) self.context_window_tokens = ( context_window_tokens if context_window_tokens is not None @@ -269,7 +263,6 @@ class AgentLoop: restrict_to_workspace=restrict_to_workspace, disabled_skills=disabled_skills, max_iterations=self.max_iterations, - max_concurrent_subagents=self.max_concurrent_subagents, ) self._unified_session = unified_session self._max_messages = max_messages if max_messages > 0 else 120 diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index 276f8bcc2..6d64698a7 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -82,7 +82,6 @@ class SubagentManager: restrict_to_workspace: bool = False, disabled_skills: list[str] | None = None, max_iterations: int | None = None, - max_concurrent_subagents: int | None = None, ): defaults = AgentDefaults() self.provider = provider @@ -99,12 +98,7 @@ class SubagentManager: if max_iterations is not None else defaults.max_tool_iterations ) - self.max_concurrent_subagents = ( - max_concurrent_subagents - if max_concurrent_subagents is not None - else defaults.max_concurrent_subagents - ) - self._concurrency_gate = asyncio.Semaphore(self.max_concurrent_subagents) + self.max_concurrent_subagents = defaults.max_concurrent_subagents self.runner = AgentRunner(provider) self._running_tasks: dict[str, asyncio.Task[None]] = {} self._task_statuses: dict[str, SubagentStatus] = {} @@ -138,9 +132,7 @@ class SubagentManager: self._task_statuses[task_id] = status bg_task = asyncio.create_task( - self._run_subagent_limited( - task_id, task, display_label, origin, status, origin_message_id - ) + self._run_subagent(task_id, task, display_label, origin, status, origin_message_id) ) self._running_tasks[task_id] = bg_task if session_key: @@ -159,19 +151,6 @@ class SubagentManager: logger.info("Spawned subagent [{}]: {}", task_id, display_label) return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes." - async def _run_subagent_limited( - self, - task_id: str, - task: str, - label: str, - origin: dict[str, str], - status: SubagentStatus, - origin_message_id: str | None = None, - ) -> None: - """Wait for an execution slot, then run the subagent.""" - async with self._concurrency_gate: - await self._run_subagent(task_id, task, label, origin, status, origin_message_id) - async def _run_subagent( self, task_id: str, diff --git a/nanobot/agent/tools/spawn.py b/nanobot/agent/tools/spawn.py index a1acf0aae..17ad48d12 100644 --- a/nanobot/agent/tools/spawn.py +++ b/nanobot/agent/tools/spawn.py @@ -56,6 +56,14 @@ class SpawnTool(Tool): async def execute(self, task: str, label: str | None = None, **kwargs: Any) -> str: """Spawn a subagent to execute the given task.""" + running = self._manager.get_running_count() + limit = self._manager.max_concurrent_subagents + if running >= limit: + return ( + f"Cannot spawn subagent: concurrency limit reached " + f"({running}/{limit} running). Wait for a running subagent " + f"to complete before spawning a new one." + ) return await self._manager.spawn( task=task, label=label, diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 8f13c4135..23f409b53 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -514,7 +514,6 @@ def serve( workspace=runtime_config.workspace_path, model=runtime_config.agents.defaults.model, max_iterations=runtime_config.agents.defaults.max_tool_iterations, - max_concurrent_subagents=runtime_config.agents.defaults.max_concurrent_subagents, context_window_tokens=runtime_config.agents.defaults.context_window_tokens, context_block_limit=runtime_config.agents.defaults.context_block_limit, max_tool_result_chars=runtime_config.agents.defaults.max_tool_result_chars, @@ -628,7 +627,6 @@ def _run_gateway( workspace=config.workspace_path, model=provider_snapshot.model, max_iterations=config.agents.defaults.max_tool_iterations, - max_concurrent_subagents=config.agents.defaults.max_concurrent_subagents, context_window_tokens=provider_snapshot.context_window_tokens, web_config=config.tools.web, context_block_limit=config.agents.defaults.context_block_limit, @@ -1021,7 +1019,6 @@ def agent( workspace=config.workspace_path, model=config.agents.defaults.model, max_iterations=config.agents.defaults.max_tool_iterations, - max_concurrent_subagents=config.agents.defaults.max_concurrent_subagents, context_window_tokens=config.agents.defaults.context_window_tokens, web_config=config.tools.web, context_block_limit=config.agents.defaults.context_block_limit, diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py index 05e8c16d0..5e5857595 100644 --- a/nanobot/nanobot.py +++ b/nanobot/nanobot.py @@ -72,7 +72,6 @@ class Nanobot: workspace=config.workspace_path, model=defaults.model, max_iterations=defaults.max_tool_iterations, - max_concurrent_subagents=defaults.max_concurrent_subagents, context_window_tokens=defaults.context_window_tokens, context_block_limit=defaults.context_block_limit, max_tool_result_chars=defaults.max_tool_result_chars, diff --git a/tests/agent/tools/test_subagent_tools.py b/tests/agent/tools/test_subagent_tools.py index ef90572e3..f43f98f24 100644 --- a/tests/agent/tools/test_subagent_tools.py +++ b/tests/agent/tools/test_subagent_tools.py @@ -94,9 +94,10 @@ async def test_subagent_uses_configured_max_iterations(tmp_path): @pytest.mark.asyncio -async def test_subagent_manager_limits_concurrent_spawn_runs(tmp_path): - """Spawned subagents should not execute more than the configured limit at once.""" +async def test_spawn_tool_rejects_when_at_concurrency_limit(tmp_path): + """SpawnTool should return an error string when the concurrency limit is reached.""" from nanobot.agent.subagent import SubagentManager + from nanobot.agent.tools.spawn import SpawnTool from nanobot.bus.queue import MessageBus bus = MessageBus() @@ -107,25 +108,14 @@ async def test_subagent_manager_limits_concurrent_spawn_runs(tmp_path): workspace=tmp_path, bus=bus, max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, - max_concurrent_subagents=1, ) mgr._announce_result = AsyncMock() - active = 0 - max_active = 0 - started_count = 0 - starts: asyncio.Queue[int] = asyncio.Queue() - releases = [asyncio.Event(), asyncio.Event()] + # Block the first subagent so it stays "running" + release = asyncio.Event() async def fake_run(spec): - nonlocal active, max_active, started_count - index = started_count - started_count += 1 - active += 1 - max_active = max(max_active, active) - await starts.put(index) - await releases[index].wait() - active -= 1 + await release.wait() return SimpleNamespace( stop_reason="done", final_content="done", @@ -135,21 +125,22 @@ async def test_subagent_manager_limits_concurrent_spawn_runs(tmp_path): mgr.runner.run = AsyncMock(side_effect=fake_run) - await mgr.spawn("first task", session_key="test:session") - await mgr.spawn("second task", session_key="test:session") - tasks = list(mgr._running_tasks.values()) + tool = SpawnTool(mgr) + tool.set_context("test", "c1", "test:c1") - first_index = await asyncio.wait_for(starts.get(), timeout=0.5) - with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(starts.get(), timeout=0.05) + # First spawn succeeds + result = await tool.execute(task="first task") + assert "started" in result - releases[first_index].set() - second_index = await asyncio.wait_for(starts.get(), timeout=0.5) - releases[second_index].set() - await asyncio.gather(*tasks) + # Second spawn should be rejected (default limit is 1) + result = await tool.execute(task="second task") + assert "Cannot spawn subagent" in result + assert "concurrency limit reached" in result - assert max_active == 1 - assert mgr.runner.run.await_count == 2 + # Release the first subagent + release.set() + # Allow cleanup + await asyncio.gather(*mgr._running_tasks.values(), return_exceptions=True) def test_subagent_default_max_concurrent_matches_agent_defaults(tmp_path): @@ -210,26 +201,6 @@ def test_agent_loop_passes_max_iterations_to_subagents(tmp_path): assert loop.subagents.max_iterations == 42 -def test_agent_loop_passes_max_concurrent_subagents_to_subagents(tmp_path): - """AgentLoop's configured subagent concurrency limit should be shared with subagents.""" - from nanobot.agent.loop import AgentLoop - from nanobot.bus.queue import MessageBus - - bus = MessageBus() - provider = MagicMock() - provider.get_default_model.return_value = "test-model" - - loop = AgentLoop( - bus=bus, - provider=provider, - workspace=tmp_path, - model="test-model", - max_concurrent_subagents=2, - ) - - assert loop.subagents.max_concurrent_subagents == 2 - - @pytest.mark.asyncio async def test_agent_loop_syncs_updated_max_iterations_before_run(tmp_path): """Runtime max_iterations changes should be reflected before tool execution.""" diff --git a/tests/test_tool_contextvars.py b/tests/test_tool_contextvars.py index a1e7bd8c0..3763ba980 100644 --- a/tests/test_tool_contextvars.py +++ b/tests/test_tool_contextvars.py @@ -49,6 +49,11 @@ async def test_spawn_tool_keeps_task_local_context() -> None: release = asyncio.Event() class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *, @@ -156,6 +161,11 @@ async def test_spawn_tool_basic_set_context_and_execute() -> None: seen: list[tuple[str, str, str]] = [] class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *, @@ -183,6 +193,11 @@ async def test_spawn_tool_default_values_without_set_context() -> None: seen: list[tuple[str, str, str]] = [] class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *,