From c30e4d86f37b11f7193c9e5cc39a750706b6d0f1 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Tue, 5 May 2026 21:11:27 +0800 Subject: [PATCH] refactor(agent): simplify subagent concurrency with rejection over semaphore Replace the asyncio.Semaphore queueing approach with a simple count check in SpawnTool.execute(). When the concurrency limit is reached, the tool returns an error string so the agent can perceive the reason and adjust its behavior instead of silently queueing. - Remove max_concurrent_subagents parameter threading through AgentLoop, commands.py, and nanobot.py - SubagentManager reads the limit directly from AgentDefaults - SpawnTool checks get_running_count() before calling spawn() - Simplify tests to verify rejection behavior --- docs/configuration.md | 22 ++++++++ nanobot/agent/subagent.py | 4 +- nanobot/agent/tools/spawn.py | 8 +++ nanobot/config/schema.py | 1 + tests/agent/tools/test_subagent_tools.py | 69 ++++++++++++++++++++++++ tests/test_tool_contextvars.py | 15 ++++++ 6 files changed, 118 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index ec889c758..d0a7fe940 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1009,6 +1009,28 @@ MCP tools are automatically discovered and registered on startup. The LLM can us **Docker security**: The official Docker image runs as a non-root user (`nanobot`, UID 1000) with bubblewrap pre-installed. When using `docker-compose.yml`, the container drops all Linux capabilities except `SYS_ADMIN` (required for bwrap's namespace isolation). +## Subagent Concurrency + +By default, nanobot only allows one spawned subagent at a time. When the limit is +reached, the `spawn` tool returns an error so the agent can decide to wait or +rearrange its work. This protects local LLM servers from loading multiple KV caches +at once. If your provider can handle more parallel work, raise the limit: + +```json +{ + "agents": { + "defaults": { + "maxConcurrentSubagents": 2 + } + } +} +``` + +| Option | Default | Description | +|--------|---------|-------------| +| `agents.defaults.maxConcurrentSubagents` | `1` | Maximum number of spawned subagents that may run at the same time. Attempts to spawn beyond this limit return an error. | + + ## Auto Compact When a user is idle for longer than a configured threshold, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input. diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index 18f0bd53b..6d64698a7 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -83,6 +83,7 @@ class SubagentManager: disabled_skills: list[str] | None = None, max_iterations: int | None = None, ): + defaults = AgentDefaults() self.provider = provider self.workspace = workspace self.bus = bus @@ -95,8 +96,9 @@ class SubagentManager: self.max_iterations = ( max_iterations if max_iterations is not None - else AgentDefaults().max_tool_iterations + else defaults.max_tool_iterations ) + self.max_concurrent_subagents = defaults.max_concurrent_subagents self.runner = AgentRunner(provider) self._running_tasks: dict[str, asyncio.Task[None]] = {} self._task_statuses: dict[str, SubagentStatus] = {} diff --git a/nanobot/agent/tools/spawn.py b/nanobot/agent/tools/spawn.py index a1acf0aae..17ad48d12 100644 --- a/nanobot/agent/tools/spawn.py +++ b/nanobot/agent/tools/spawn.py @@ -56,6 +56,14 @@ class SpawnTool(Tool): async def execute(self, task: str, label: str | None = None, **kwargs: Any) -> str: """Spawn a subagent to execute the given task.""" + running = self._manager.get_running_count() + limit = self._manager.max_concurrent_subagents + if running >= limit: + return ( + f"Cannot spawn subagent: concurrency limit reached " + f"({running}/{limit} running). Wait for a running subagent " + f"to complete before spawning a new one." + ) return await self._manager.spawn( task=task, label=label, diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index be4eb7202..2f20eb99e 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -78,6 +78,7 @@ class AgentDefaults(Base): context_block_limit: int | None = None temperature: float = 0.1 max_tool_iterations: int = 200 + max_concurrent_subagents: int = Field(default=1, ge=1) max_tool_result_chars: int = 16_000 provider_retry_mode: Literal["standard", "persistent"] = "standard" reasoning_effort: str | None = None # low / medium / high / adaptive - enables LLM thinking mode diff --git a/tests/agent/tools/test_subagent_tools.py b/tests/agent/tools/test_subagent_tools.py index a050a4271..f43f98f24 100644 --- a/tests/agent/tools/test_subagent_tools.py +++ b/tests/agent/tools/test_subagent_tools.py @@ -93,6 +93,75 @@ async def test_subagent_uses_configured_max_iterations(tmp_path): mgr.runner.run.assert_awaited_once() +@pytest.mark.asyncio +async def test_spawn_tool_rejects_when_at_concurrency_limit(tmp_path): + """SpawnTool should return an error string when the concurrency limit is reached.""" + from nanobot.agent.subagent import SubagentManager + from nanobot.agent.tools.spawn import SpawnTool + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + mgr = SubagentManager( + provider=provider, + workspace=tmp_path, + bus=bus, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + ) + mgr._announce_result = AsyncMock() + + # Block the first subagent so it stays "running" + release = asyncio.Event() + + async def fake_run(spec): + await release.wait() + return SimpleNamespace( + stop_reason="done", + final_content="done", + error=None, + tool_events=[], + ) + + mgr.runner.run = AsyncMock(side_effect=fake_run) + + tool = SpawnTool(mgr) + tool.set_context("test", "c1", "test:c1") + + # First spawn succeeds + result = await tool.execute(task="first task") + assert "started" in result + + # Second spawn should be rejected (default limit is 1) + result = await tool.execute(task="second task") + assert "Cannot spawn subagent" in result + assert "concurrency limit reached" in result + + # Release the first subagent + release.set() + # Allow cleanup + await asyncio.gather(*mgr._running_tasks.values(), return_exceptions=True) + + +def test_subagent_default_max_concurrent_matches_agent_defaults(tmp_path): + """Direct SubagentManager construction should use the agent default concurrency limit.""" + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + + mgr = SubagentManager( + provider=provider, + workspace=tmp_path, + bus=bus, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + ) + + assert mgr.max_concurrent_subagents == AgentDefaults().max_concurrent_subagents + + def test_subagent_default_max_iterations_matches_agent_defaults(tmp_path): """Direct SubagentManager construction should use the agent default limit.""" from nanobot.agent.subagent import SubagentManager diff --git a/tests/test_tool_contextvars.py b/tests/test_tool_contextvars.py index a1e7bd8c0..3763ba980 100644 --- a/tests/test_tool_contextvars.py +++ b/tests/test_tool_contextvars.py @@ -49,6 +49,11 @@ async def test_spawn_tool_keeps_task_local_context() -> None: release = asyncio.Event() class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *, @@ -156,6 +161,11 @@ async def test_spawn_tool_basic_set_context_and_execute() -> None: seen: list[tuple[str, str, str]] = [] class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *, @@ -183,6 +193,11 @@ async def test_spawn_tool_default_values_without_set_context() -> None: seen: list[tuple[str, str, str]] = [] class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *,