mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-19 16:12:30 +00:00
feat(agent): limit subagent concurrency
This commit is contained in:
parent
5b45191cd9
commit
2103cd5602
@ -1009,6 +1009,27 @@ MCP tools are automatically discovered and registered on startup. The LLM can us
|
||||
**Docker security**: The official Docker image runs as a non-root user (`nanobot`, UID 1000) with bubblewrap pre-installed. When using `docker-compose.yml`, the container drops all Linux capabilities except `SYS_ADMIN` (required for bwrap's namespace isolation).
|
||||
|
||||
|
||||
## Subagent Concurrency
|
||||
|
||||
By default, nanobot runs one spawned subagent at a time. This protects local LLM
|
||||
servers from loading multiple KV caches at once. If your provider can handle more
|
||||
parallel work, raise the limit:
|
||||
|
||||
```json
|
||||
{
|
||||
"agents": {
|
||||
"defaults": {
|
||||
"maxConcurrentSubagents": 2
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
| Option | Default | Description |
|
||||
|--------|---------|-------------|
|
||||
| `agents.defaults.maxConcurrentSubagents` | `1` | Maximum number of spawned subagents that may execute at the same time. Extra subagents stay queued until a slot is free. |
|
||||
|
||||
|
||||
## Auto Compact
|
||||
|
||||
When a user is idle for longer than a configured threshold, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input.
|
||||
|
||||
@ -189,6 +189,7 @@ class AgentLoop:
|
||||
workspace: Path,
|
||||
model: str | None = None,
|
||||
max_iterations: int | None = None,
|
||||
max_concurrent_subagents: int | None = None,
|
||||
context_window_tokens: int | None = None,
|
||||
context_block_limit: int | None = None,
|
||||
max_tool_result_chars: int | None = None,
|
||||
@ -225,6 +226,11 @@ class AgentLoop:
|
||||
self.max_iterations = (
|
||||
max_iterations if max_iterations is not None else defaults.max_tool_iterations
|
||||
)
|
||||
self.max_concurrent_subagents = (
|
||||
max_concurrent_subagents
|
||||
if max_concurrent_subagents is not None
|
||||
else defaults.max_concurrent_subagents
|
||||
)
|
||||
self.context_window_tokens = (
|
||||
context_window_tokens
|
||||
if context_window_tokens is not None
|
||||
@ -263,6 +269,7 @@ class AgentLoop:
|
||||
restrict_to_workspace=restrict_to_workspace,
|
||||
disabled_skills=disabled_skills,
|
||||
max_iterations=self.max_iterations,
|
||||
max_concurrent_subagents=self.max_concurrent_subagents,
|
||||
)
|
||||
self._unified_session = unified_session
|
||||
self._max_messages = max_messages if max_messages > 0 else 120
|
||||
|
||||
@ -82,7 +82,9 @@ class SubagentManager:
|
||||
restrict_to_workspace: bool = False,
|
||||
disabled_skills: list[str] | None = None,
|
||||
max_iterations: int | None = None,
|
||||
max_concurrent_subagents: int | None = None,
|
||||
):
|
||||
defaults = AgentDefaults()
|
||||
self.provider = provider
|
||||
self.workspace = workspace
|
||||
self.bus = bus
|
||||
@ -95,8 +97,14 @@ class SubagentManager:
|
||||
self.max_iterations = (
|
||||
max_iterations
|
||||
if max_iterations is not None
|
||||
else AgentDefaults().max_tool_iterations
|
||||
else defaults.max_tool_iterations
|
||||
)
|
||||
self.max_concurrent_subagents = (
|
||||
max_concurrent_subagents
|
||||
if max_concurrent_subagents is not None
|
||||
else defaults.max_concurrent_subagents
|
||||
)
|
||||
self._concurrency_gate = asyncio.Semaphore(self.max_concurrent_subagents)
|
||||
self.runner = AgentRunner(provider)
|
||||
self._running_tasks: dict[str, asyncio.Task[None]] = {}
|
||||
self._task_statuses: dict[str, SubagentStatus] = {}
|
||||
@ -130,7 +138,9 @@ class SubagentManager:
|
||||
self._task_statuses[task_id] = status
|
||||
|
||||
bg_task = asyncio.create_task(
|
||||
self._run_subagent(task_id, task, display_label, origin, status, origin_message_id)
|
||||
self._run_subagent_limited(
|
||||
task_id, task, display_label, origin, status, origin_message_id
|
||||
)
|
||||
)
|
||||
self._running_tasks[task_id] = bg_task
|
||||
if session_key:
|
||||
@ -149,6 +159,19 @@ class SubagentManager:
|
||||
logger.info("Spawned subagent [{}]: {}", task_id, display_label)
|
||||
return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."
|
||||
|
||||
async def _run_subagent_limited(
|
||||
self,
|
||||
task_id: str,
|
||||
task: str,
|
||||
label: str,
|
||||
origin: dict[str, str],
|
||||
status: SubagentStatus,
|
||||
origin_message_id: str | None = None,
|
||||
) -> None:
|
||||
"""Wait for an execution slot, then run the subagent."""
|
||||
async with self._concurrency_gate:
|
||||
await self._run_subagent(task_id, task, label, origin, status, origin_message_id)
|
||||
|
||||
async def _run_subagent(
|
||||
self,
|
||||
task_id: str,
|
||||
|
||||
@ -514,6 +514,7 @@ def serve(
|
||||
workspace=runtime_config.workspace_path,
|
||||
model=runtime_config.agents.defaults.model,
|
||||
max_iterations=runtime_config.agents.defaults.max_tool_iterations,
|
||||
max_concurrent_subagents=runtime_config.agents.defaults.max_concurrent_subagents,
|
||||
context_window_tokens=runtime_config.agents.defaults.context_window_tokens,
|
||||
context_block_limit=runtime_config.agents.defaults.context_block_limit,
|
||||
max_tool_result_chars=runtime_config.agents.defaults.max_tool_result_chars,
|
||||
@ -627,6 +628,7 @@ def _run_gateway(
|
||||
workspace=config.workspace_path,
|
||||
model=provider_snapshot.model,
|
||||
max_iterations=config.agents.defaults.max_tool_iterations,
|
||||
max_concurrent_subagents=config.agents.defaults.max_concurrent_subagents,
|
||||
context_window_tokens=provider_snapshot.context_window_tokens,
|
||||
web_config=config.tools.web,
|
||||
context_block_limit=config.agents.defaults.context_block_limit,
|
||||
@ -1019,6 +1021,7 @@ def agent(
|
||||
workspace=config.workspace_path,
|
||||
model=config.agents.defaults.model,
|
||||
max_iterations=config.agents.defaults.max_tool_iterations,
|
||||
max_concurrent_subagents=config.agents.defaults.max_concurrent_subagents,
|
||||
context_window_tokens=config.agents.defaults.context_window_tokens,
|
||||
web_config=config.tools.web,
|
||||
context_block_limit=config.agents.defaults.context_block_limit,
|
||||
|
||||
@ -78,6 +78,7 @@ class AgentDefaults(Base):
|
||||
context_block_limit: int | None = None
|
||||
temperature: float = 0.1
|
||||
max_tool_iterations: int = 200
|
||||
max_concurrent_subagents: int = Field(default=1, ge=1)
|
||||
max_tool_result_chars: int = 16_000
|
||||
provider_retry_mode: Literal["standard", "persistent"] = "standard"
|
||||
reasoning_effort: str | None = None # low / medium / high / adaptive - enables LLM thinking mode
|
||||
|
||||
@ -72,6 +72,7 @@ class Nanobot:
|
||||
workspace=config.workspace_path,
|
||||
model=defaults.model,
|
||||
max_iterations=defaults.max_tool_iterations,
|
||||
max_concurrent_subagents=defaults.max_concurrent_subagents,
|
||||
context_window_tokens=defaults.context_window_tokens,
|
||||
context_block_limit=defaults.context_block_limit,
|
||||
max_tool_result_chars=defaults.max_tool_result_chars,
|
||||
|
||||
@ -93,6 +93,84 @@ async def test_subagent_uses_configured_max_iterations(tmp_path):
|
||||
mgr.runner.run.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subagent_manager_limits_concurrent_spawn_runs(tmp_path):
|
||||
"""Spawned subagents should not execute more than the configured limit at once."""
|
||||
from nanobot.agent.subagent import SubagentManager
|
||||
from nanobot.bus.queue import MessageBus
|
||||
|
||||
bus = MessageBus()
|
||||
provider = MagicMock()
|
||||
provider.get_default_model.return_value = "test-model"
|
||||
mgr = SubagentManager(
|
||||
provider=provider,
|
||||
workspace=tmp_path,
|
||||
bus=bus,
|
||||
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
||||
max_concurrent_subagents=1,
|
||||
)
|
||||
mgr._announce_result = AsyncMock()
|
||||
|
||||
active = 0
|
||||
max_active = 0
|
||||
started_count = 0
|
||||
starts: asyncio.Queue[int] = asyncio.Queue()
|
||||
releases = [asyncio.Event(), asyncio.Event()]
|
||||
|
||||
async def fake_run(spec):
|
||||
nonlocal active, max_active, started_count
|
||||
index = started_count
|
||||
started_count += 1
|
||||
active += 1
|
||||
max_active = max(max_active, active)
|
||||
await starts.put(index)
|
||||
await releases[index].wait()
|
||||
active -= 1
|
||||
return SimpleNamespace(
|
||||
stop_reason="done",
|
||||
final_content="done",
|
||||
error=None,
|
||||
tool_events=[],
|
||||
)
|
||||
|
||||
mgr.runner.run = AsyncMock(side_effect=fake_run)
|
||||
|
||||
await mgr.spawn("first task", session_key="test:session")
|
||||
await mgr.spawn("second task", session_key="test:session")
|
||||
tasks = list(mgr._running_tasks.values())
|
||||
|
||||
first_index = await asyncio.wait_for(starts.get(), timeout=0.5)
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await asyncio.wait_for(starts.get(), timeout=0.05)
|
||||
|
||||
releases[first_index].set()
|
||||
second_index = await asyncio.wait_for(starts.get(), timeout=0.5)
|
||||
releases[second_index].set()
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
assert max_active == 1
|
||||
assert mgr.runner.run.await_count == 2
|
||||
|
||||
|
||||
def test_subagent_default_max_concurrent_matches_agent_defaults(tmp_path):
|
||||
"""Direct SubagentManager construction should use the agent default concurrency limit."""
|
||||
from nanobot.agent.subagent import SubagentManager
|
||||
from nanobot.bus.queue import MessageBus
|
||||
|
||||
bus = MessageBus()
|
||||
provider = MagicMock()
|
||||
provider.get_default_model.return_value = "test-model"
|
||||
|
||||
mgr = SubagentManager(
|
||||
provider=provider,
|
||||
workspace=tmp_path,
|
||||
bus=bus,
|
||||
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
||||
)
|
||||
|
||||
assert mgr.max_concurrent_subagents == AgentDefaults().max_concurrent_subagents
|
||||
|
||||
|
||||
def test_subagent_default_max_iterations_matches_agent_defaults(tmp_path):
|
||||
"""Direct SubagentManager construction should use the agent default limit."""
|
||||
from nanobot.agent.subagent import SubagentManager
|
||||
@ -132,6 +210,26 @@ def test_agent_loop_passes_max_iterations_to_subagents(tmp_path):
|
||||
assert loop.subagents.max_iterations == 42
|
||||
|
||||
|
||||
def test_agent_loop_passes_max_concurrent_subagents_to_subagents(tmp_path):
|
||||
"""AgentLoop's configured subagent concurrency limit should be shared with subagents."""
|
||||
from nanobot.agent.loop import AgentLoop
|
||||
from nanobot.bus.queue import MessageBus
|
||||
|
||||
bus = MessageBus()
|
||||
provider = MagicMock()
|
||||
provider.get_default_model.return_value = "test-model"
|
||||
|
||||
loop = AgentLoop(
|
||||
bus=bus,
|
||||
provider=provider,
|
||||
workspace=tmp_path,
|
||||
model="test-model",
|
||||
max_concurrent_subagents=2,
|
||||
)
|
||||
|
||||
assert loop.subagents.max_concurrent_subagents == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_loop_syncs_updated_max_iterations_before_run(tmp_path):
|
||||
"""Runtime max_iterations changes should be reflected before tool execution."""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user