From 2103cd5602cbc10762f995e87dd9bfb32c7ef8dd Mon Sep 17 00:00:00 2001 From: MrBob Date: Sun, 3 May 2026 15:08:39 -0300 Subject: [PATCH] feat(agent): limit subagent concurrency --- docs/configuration.md | 21 +++++ nanobot/agent/loop.py | 7 ++ nanobot/agent/subagent.py | 27 ++++++- nanobot/cli/commands.py | 3 + nanobot/config/schema.py | 1 + nanobot/nanobot.py | 1 + tests/agent/tools/test_subagent_tools.py | 98 ++++++++++++++++++++++++ 7 files changed, 156 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index ec889c758..f1bb56a99 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1009,6 +1009,27 @@ MCP tools are automatically discovered and registered on startup. The LLM can us **Docker security**: The official Docker image runs as a non-root user (`nanobot`, UID 1000) with bubblewrap pre-installed. When using `docker-compose.yml`, the container drops all Linux capabilities except `SYS_ADMIN` (required for bwrap's namespace isolation). +## Subagent Concurrency + +By default, nanobot runs one spawned subagent at a time. This protects local LLM +servers from loading multiple KV caches at once. If your provider can handle more +parallel work, raise the limit: + +```json +{ + "agents": { + "defaults": { + "maxConcurrentSubagents": 2 + } + } +} +``` + +| Option | Default | Description | +|--------|---------|-------------| +| `agents.defaults.maxConcurrentSubagents` | `1` | Maximum number of spawned subagents that may execute at the same time. Extra subagents stay queued until a slot is free. | + + ## Auto Compact When a user is idle for longer than a configured threshold, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input. diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 1a8042fed..72d8adc0f 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -189,6 +189,7 @@ class AgentLoop: workspace: Path, model: str | None = None, max_iterations: int | None = None, + max_concurrent_subagents: int | None = None, context_window_tokens: int | None = None, context_block_limit: int | None = None, max_tool_result_chars: int | None = None, @@ -225,6 +226,11 @@ class AgentLoop: self.max_iterations = ( max_iterations if max_iterations is not None else defaults.max_tool_iterations ) + self.max_concurrent_subagents = ( + max_concurrent_subagents + if max_concurrent_subagents is not None + else defaults.max_concurrent_subagents + ) self.context_window_tokens = ( context_window_tokens if context_window_tokens is not None @@ -263,6 +269,7 @@ class AgentLoop: restrict_to_workspace=restrict_to_workspace, disabled_skills=disabled_skills, max_iterations=self.max_iterations, + max_concurrent_subagents=self.max_concurrent_subagents, ) self._unified_session = unified_session self._max_messages = max_messages if max_messages > 0 else 120 diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index 18f0bd53b..276f8bcc2 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -82,7 +82,9 @@ class SubagentManager: restrict_to_workspace: bool = False, disabled_skills: list[str] | None = None, max_iterations: int | None = None, + max_concurrent_subagents: int | None = None, ): + defaults = AgentDefaults() self.provider = provider self.workspace = workspace self.bus = bus @@ -95,8 +97,14 @@ class SubagentManager: self.max_iterations = ( max_iterations if max_iterations is not None - else AgentDefaults().max_tool_iterations + else defaults.max_tool_iterations ) + self.max_concurrent_subagents = ( + max_concurrent_subagents + if max_concurrent_subagents is not None + else defaults.max_concurrent_subagents + ) + self._concurrency_gate = asyncio.Semaphore(self.max_concurrent_subagents) self.runner = AgentRunner(provider) self._running_tasks: dict[str, asyncio.Task[None]] = {} self._task_statuses: dict[str, SubagentStatus] = {} @@ -130,7 +138,9 @@ class SubagentManager: self._task_statuses[task_id] = status bg_task = asyncio.create_task( - self._run_subagent(task_id, task, display_label, origin, status, origin_message_id) + self._run_subagent_limited( + task_id, task, display_label, origin, status, origin_message_id + ) ) self._running_tasks[task_id] = bg_task if session_key: @@ -149,6 +159,19 @@ class SubagentManager: logger.info("Spawned subagent [{}]: {}", task_id, display_label) return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes." + async def _run_subagent_limited( + self, + task_id: str, + task: str, + label: str, + origin: dict[str, str], + status: SubagentStatus, + origin_message_id: str | None = None, + ) -> None: + """Wait for an execution slot, then run the subagent.""" + async with self._concurrency_gate: + await self._run_subagent(task_id, task, label, origin, status, origin_message_id) + async def _run_subagent( self, task_id: str, diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 23f409b53..8f13c4135 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -514,6 +514,7 @@ def serve( workspace=runtime_config.workspace_path, model=runtime_config.agents.defaults.model, max_iterations=runtime_config.agents.defaults.max_tool_iterations, + max_concurrent_subagents=runtime_config.agents.defaults.max_concurrent_subagents, context_window_tokens=runtime_config.agents.defaults.context_window_tokens, context_block_limit=runtime_config.agents.defaults.context_block_limit, max_tool_result_chars=runtime_config.agents.defaults.max_tool_result_chars, @@ -627,6 +628,7 @@ def _run_gateway( workspace=config.workspace_path, model=provider_snapshot.model, max_iterations=config.agents.defaults.max_tool_iterations, + max_concurrent_subagents=config.agents.defaults.max_concurrent_subagents, context_window_tokens=provider_snapshot.context_window_tokens, web_config=config.tools.web, context_block_limit=config.agents.defaults.context_block_limit, @@ -1019,6 +1021,7 @@ def agent( workspace=config.workspace_path, model=config.agents.defaults.model, max_iterations=config.agents.defaults.max_tool_iterations, + max_concurrent_subagents=config.agents.defaults.max_concurrent_subagents, context_window_tokens=config.agents.defaults.context_window_tokens, web_config=config.tools.web, context_block_limit=config.agents.defaults.context_block_limit, diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index be4eb7202..2f20eb99e 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -78,6 +78,7 @@ class AgentDefaults(Base): context_block_limit: int | None = None temperature: float = 0.1 max_tool_iterations: int = 200 + max_concurrent_subagents: int = Field(default=1, ge=1) max_tool_result_chars: int = 16_000 provider_retry_mode: Literal["standard", "persistent"] = "standard" reasoning_effort: str | None = None # low / medium / high / adaptive - enables LLM thinking mode diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py index 5e5857595..05e8c16d0 100644 --- a/nanobot/nanobot.py +++ b/nanobot/nanobot.py @@ -72,6 +72,7 @@ class Nanobot: workspace=config.workspace_path, model=defaults.model, max_iterations=defaults.max_tool_iterations, + max_concurrent_subagents=defaults.max_concurrent_subagents, context_window_tokens=defaults.context_window_tokens, context_block_limit=defaults.context_block_limit, max_tool_result_chars=defaults.max_tool_result_chars, diff --git a/tests/agent/tools/test_subagent_tools.py b/tests/agent/tools/test_subagent_tools.py index a050a4271..ef90572e3 100644 --- a/tests/agent/tools/test_subagent_tools.py +++ b/tests/agent/tools/test_subagent_tools.py @@ -93,6 +93,84 @@ async def test_subagent_uses_configured_max_iterations(tmp_path): mgr.runner.run.assert_awaited_once() +@pytest.mark.asyncio +async def test_subagent_manager_limits_concurrent_spawn_runs(tmp_path): + """Spawned subagents should not execute more than the configured limit at once.""" + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + mgr = SubagentManager( + provider=provider, + workspace=tmp_path, + bus=bus, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + max_concurrent_subagents=1, + ) + mgr._announce_result = AsyncMock() + + active = 0 + max_active = 0 + started_count = 0 + starts: asyncio.Queue[int] = asyncio.Queue() + releases = [asyncio.Event(), asyncio.Event()] + + async def fake_run(spec): + nonlocal active, max_active, started_count + index = started_count + started_count += 1 + active += 1 + max_active = max(max_active, active) + await starts.put(index) + await releases[index].wait() + active -= 1 + return SimpleNamespace( + stop_reason="done", + final_content="done", + error=None, + tool_events=[], + ) + + mgr.runner.run = AsyncMock(side_effect=fake_run) + + await mgr.spawn("first task", session_key="test:session") + await mgr.spawn("second task", session_key="test:session") + tasks = list(mgr._running_tasks.values()) + + first_index = await asyncio.wait_for(starts.get(), timeout=0.5) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(starts.get(), timeout=0.05) + + releases[first_index].set() + second_index = await asyncio.wait_for(starts.get(), timeout=0.5) + releases[second_index].set() + await asyncio.gather(*tasks) + + assert max_active == 1 + assert mgr.runner.run.await_count == 2 + + +def test_subagent_default_max_concurrent_matches_agent_defaults(tmp_path): + """Direct SubagentManager construction should use the agent default concurrency limit.""" + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + + mgr = SubagentManager( + provider=provider, + workspace=tmp_path, + bus=bus, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + ) + + assert mgr.max_concurrent_subagents == AgentDefaults().max_concurrent_subagents + + def test_subagent_default_max_iterations_matches_agent_defaults(tmp_path): """Direct SubagentManager construction should use the agent default limit.""" from nanobot.agent.subagent import SubagentManager @@ -132,6 +210,26 @@ def test_agent_loop_passes_max_iterations_to_subagents(tmp_path): assert loop.subagents.max_iterations == 42 +def test_agent_loop_passes_max_concurrent_subagents_to_subagents(tmp_path): + """AgentLoop's configured subagent concurrency limit should be shared with subagents.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + + loop = AgentLoop( + bus=bus, + provider=provider, + workspace=tmp_path, + model="test-model", + max_concurrent_subagents=2, + ) + + assert loop.subagents.max_concurrent_subagents == 2 + + @pytest.mark.asyncio async def test_agent_loop_syncs_updated_max_iterations_before_run(tmp_path): """Runtime max_iterations changes should be reflected before tool execution."""