From 3baa869fdb4bf67c65417f6e069bb26c7609e7cd Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Tue, 5 May 2026 21:11:27 +0800 Subject: [PATCH] refactor(agent): simplify subagent concurrency with rejection over semaphore Replace the asyncio.Semaphore queueing approach with a simple count check in SpawnTool.execute(). When the concurrency limit is reached, the tool returns an error string so the agent can perceive the reason and adjust its behavior instead of silently queueing. - Remove max_concurrent_subagents parameter threading through AgentLoop, commands.py, and nanobot.py - SubagentManager reads the limit directly from AgentDefaults - SpawnTool checks get_running_count() before calling spawn() - Simplify tests to verify rejection behavior --- docs/configuration.md | 9 ++-- nanobot/agent/loop.py | 7 --- nanobot/agent/subagent.py | 25 +-------- nanobot/agent/tools/spawn.py | 8 +++ nanobot/cli/commands.py | 3 -- nanobot/nanobot.py | 1 - tests/agent/tools/test_subagent_tools.py | 67 +++++++----------------- tests/test_tool_contextvars.py | 15 ++++++ 8 files changed, 49 insertions(+), 86 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index f1bb56a99..d0a7fe940 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1011,9 +1011,10 @@ MCP tools are automatically discovered and registered on startup. The LLM can us ## Subagent Concurrency -By default, nanobot runs one spawned subagent at a time. This protects local LLM -servers from loading multiple KV caches at once. If your provider can handle more -parallel work, raise the limit: +By default, nanobot only allows one spawned subagent at a time. When the limit is +reached, the `spawn` tool returns an error so the agent can decide to wait or +rearrange its work. This protects local LLM servers from loading multiple KV caches +at once. If your provider can handle more parallel work, raise the limit: ```json { @@ -1027,7 +1028,7 @@ parallel work, raise the limit: | Option | Default | Description | |--------|---------|-------------| -| `agents.defaults.maxConcurrentSubagents` | `1` | Maximum number of spawned subagents that may execute at the same time. Extra subagents stay queued until a slot is free. | +| `agents.defaults.maxConcurrentSubagents` | `1` | Maximum number of spawned subagents that may run at the same time. Attempts to spawn beyond this limit return an error. | ## Auto Compact diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 72d8adc0f..1a8042fed 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -189,7 +189,6 @@ class AgentLoop: workspace: Path, model: str | None = None, max_iterations: int | None = None, - max_concurrent_subagents: int | None = None, context_window_tokens: int | None = None, context_block_limit: int | None = None, max_tool_result_chars: int | None = None, @@ -226,11 +225,6 @@ class AgentLoop: self.max_iterations = ( max_iterations if max_iterations is not None else defaults.max_tool_iterations ) - self.max_concurrent_subagents = ( - max_concurrent_subagents - if max_concurrent_subagents is not None - else defaults.max_concurrent_subagents - ) self.context_window_tokens = ( context_window_tokens if context_window_tokens is not None @@ -269,7 +263,6 @@ class AgentLoop: restrict_to_workspace=restrict_to_workspace, disabled_skills=disabled_skills, max_iterations=self.max_iterations, - max_concurrent_subagents=self.max_concurrent_subagents, ) self._unified_session = unified_session self._max_messages = max_messages if max_messages > 0 else 120 diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index 276f8bcc2..6d64698a7 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -82,7 +82,6 @@ class SubagentManager: restrict_to_workspace: bool = False, disabled_skills: list[str] | None = None, max_iterations: int | None = None, - max_concurrent_subagents: int | None = None, ): defaults = AgentDefaults() self.provider = provider @@ -99,12 +98,7 @@ class SubagentManager: if max_iterations is not None else defaults.max_tool_iterations ) - self.max_concurrent_subagents = ( - max_concurrent_subagents - if max_concurrent_subagents is not None - else defaults.max_concurrent_subagents - ) - self._concurrency_gate = asyncio.Semaphore(self.max_concurrent_subagents) + self.max_concurrent_subagents = defaults.max_concurrent_subagents self.runner = AgentRunner(provider) self._running_tasks: dict[str, asyncio.Task[None]] = {} self._task_statuses: dict[str, SubagentStatus] = {} @@ -138,9 +132,7 @@ class SubagentManager: self._task_statuses[task_id] = status bg_task = asyncio.create_task( - self._run_subagent_limited( - task_id, task, display_label, origin, status, origin_message_id - ) + self._run_subagent(task_id, task, display_label, origin, status, origin_message_id) ) self._running_tasks[task_id] = bg_task if session_key: @@ -159,19 +151,6 @@ class SubagentManager: logger.info("Spawned subagent [{}]: {}", task_id, display_label) return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes." - async def _run_subagent_limited( - self, - task_id: str, - task: str, - label: str, - origin: dict[str, str], - status: SubagentStatus, - origin_message_id: str | None = None, - ) -> None: - """Wait for an execution slot, then run the subagent.""" - async with self._concurrency_gate: - await self._run_subagent(task_id, task, label, origin, status, origin_message_id) - async def _run_subagent( self, task_id: str, diff --git a/nanobot/agent/tools/spawn.py b/nanobot/agent/tools/spawn.py index a1acf0aae..17ad48d12 100644 --- a/nanobot/agent/tools/spawn.py +++ b/nanobot/agent/tools/spawn.py @@ -56,6 +56,14 @@ class SpawnTool(Tool): async def execute(self, task: str, label: str | None = None, **kwargs: Any) -> str: """Spawn a subagent to execute the given task.""" + running = self._manager.get_running_count() + limit = self._manager.max_concurrent_subagents + if running >= limit: + return ( + f"Cannot spawn subagent: concurrency limit reached " + f"({running}/{limit} running). Wait for a running subagent " + f"to complete before spawning a new one." + ) return await self._manager.spawn( task=task, label=label, diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 8f13c4135..23f409b53 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -514,7 +514,6 @@ def serve( workspace=runtime_config.workspace_path, model=runtime_config.agents.defaults.model, max_iterations=runtime_config.agents.defaults.max_tool_iterations, - max_concurrent_subagents=runtime_config.agents.defaults.max_concurrent_subagents, context_window_tokens=runtime_config.agents.defaults.context_window_tokens, context_block_limit=runtime_config.agents.defaults.context_block_limit, max_tool_result_chars=runtime_config.agents.defaults.max_tool_result_chars, @@ -628,7 +627,6 @@ def _run_gateway( workspace=config.workspace_path, model=provider_snapshot.model, max_iterations=config.agents.defaults.max_tool_iterations, - max_concurrent_subagents=config.agents.defaults.max_concurrent_subagents, context_window_tokens=provider_snapshot.context_window_tokens, web_config=config.tools.web, context_block_limit=config.agents.defaults.context_block_limit, @@ -1021,7 +1019,6 @@ def agent( workspace=config.workspace_path, model=config.agents.defaults.model, max_iterations=config.agents.defaults.max_tool_iterations, - max_concurrent_subagents=config.agents.defaults.max_concurrent_subagents, context_window_tokens=config.agents.defaults.context_window_tokens, web_config=config.tools.web, context_block_limit=config.agents.defaults.context_block_limit, diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py index 05e8c16d0..5e5857595 100644 --- a/nanobot/nanobot.py +++ b/nanobot/nanobot.py @@ -72,7 +72,6 @@ class Nanobot: workspace=config.workspace_path, model=defaults.model, max_iterations=defaults.max_tool_iterations, - max_concurrent_subagents=defaults.max_concurrent_subagents, context_window_tokens=defaults.context_window_tokens, context_block_limit=defaults.context_block_limit, max_tool_result_chars=defaults.max_tool_result_chars, diff --git a/tests/agent/tools/test_subagent_tools.py b/tests/agent/tools/test_subagent_tools.py index ef90572e3..f43f98f24 100644 --- a/tests/agent/tools/test_subagent_tools.py +++ b/tests/agent/tools/test_subagent_tools.py @@ -94,9 +94,10 @@ async def test_subagent_uses_configured_max_iterations(tmp_path): @pytest.mark.asyncio -async def test_subagent_manager_limits_concurrent_spawn_runs(tmp_path): - """Spawned subagents should not execute more than the configured limit at once.""" +async def test_spawn_tool_rejects_when_at_concurrency_limit(tmp_path): + """SpawnTool should return an error string when the concurrency limit is reached.""" from nanobot.agent.subagent import SubagentManager + from nanobot.agent.tools.spawn import SpawnTool from nanobot.bus.queue import MessageBus bus = MessageBus() @@ -107,25 +108,14 @@ async def test_subagent_manager_limits_concurrent_spawn_runs(tmp_path): workspace=tmp_path, bus=bus, max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, - max_concurrent_subagents=1, ) mgr._announce_result = AsyncMock() - active = 0 - max_active = 0 - started_count = 0 - starts: asyncio.Queue[int] = asyncio.Queue() - releases = [asyncio.Event(), asyncio.Event()] + # Block the first subagent so it stays "running" + release = asyncio.Event() async def fake_run(spec): - nonlocal active, max_active, started_count - index = started_count - started_count += 1 - active += 1 - max_active = max(max_active, active) - await starts.put(index) - await releases[index].wait() - active -= 1 + await release.wait() return SimpleNamespace( stop_reason="done", final_content="done", @@ -135,21 +125,22 @@ async def test_subagent_manager_limits_concurrent_spawn_runs(tmp_path): mgr.runner.run = AsyncMock(side_effect=fake_run) - await mgr.spawn("first task", session_key="test:session") - await mgr.spawn("second task", session_key="test:session") - tasks = list(mgr._running_tasks.values()) + tool = SpawnTool(mgr) + tool.set_context("test", "c1", "test:c1") - first_index = await asyncio.wait_for(starts.get(), timeout=0.5) - with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(starts.get(), timeout=0.05) + # First spawn succeeds + result = await tool.execute(task="first task") + assert "started" in result - releases[first_index].set() - second_index = await asyncio.wait_for(starts.get(), timeout=0.5) - releases[second_index].set() - await asyncio.gather(*tasks) + # Second spawn should be rejected (default limit is 1) + result = await tool.execute(task="second task") + assert "Cannot spawn subagent" in result + assert "concurrency limit reached" in result - assert max_active == 1 - assert mgr.runner.run.await_count == 2 + # Release the first subagent + release.set() + # Allow cleanup + await asyncio.gather(*mgr._running_tasks.values(), return_exceptions=True) def test_subagent_default_max_concurrent_matches_agent_defaults(tmp_path): @@ -210,26 +201,6 @@ def test_agent_loop_passes_max_iterations_to_subagents(tmp_path): assert loop.subagents.max_iterations == 42 -def test_agent_loop_passes_max_concurrent_subagents_to_subagents(tmp_path): - """AgentLoop's configured subagent concurrency limit should be shared with subagents.""" - from nanobot.agent.loop import AgentLoop - from nanobot.bus.queue import MessageBus - - bus = MessageBus() - provider = MagicMock() - provider.get_default_model.return_value = "test-model" - - loop = AgentLoop( - bus=bus, - provider=provider, - workspace=tmp_path, - model="test-model", - max_concurrent_subagents=2, - ) - - assert loop.subagents.max_concurrent_subagents == 2 - - @pytest.mark.asyncio async def test_agent_loop_syncs_updated_max_iterations_before_run(tmp_path): """Runtime max_iterations changes should be reflected before tool execution.""" diff --git a/tests/test_tool_contextvars.py b/tests/test_tool_contextvars.py index a1e7bd8c0..3763ba980 100644 --- a/tests/test_tool_contextvars.py +++ b/tests/test_tool_contextvars.py @@ -49,6 +49,11 @@ async def test_spawn_tool_keeps_task_local_context() -> None: release = asyncio.Event() class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *, @@ -156,6 +161,11 @@ async def test_spawn_tool_basic_set_context_and_execute() -> None: seen: list[tuple[str, str, str]] = [] class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *, @@ -183,6 +193,11 @@ async def test_spawn_tool_default_values_without_set_context() -> None: seen: list[tuple[str, str, str]] = [] class _Manager: + max_concurrent_subagents = 1 + + def get_running_count(self) -> int: + return 0 + async def spawn( self, *,