mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-07 02:05:51 +00:00
Replace the asyncio.Semaphore queueing approach with a simple count check in SpawnTool.execute(). When the concurrency limit is reached, the tool returns an error string so the agent can perceive the reason and adjust its behavior instead of silently queueing. - Remove max_concurrent_subagents parameter threading through AgentLoop, commands.py, and nanobot.py - SubagentManager reads the limit directly from AgentDefaults - SpawnTool checks get_running_count() before calling spawn() - Simplify tests to verify rejection behavior
446 lines
14 KiB
Python
446 lines
14 KiB
Python
"""Tests for subagent tool registration and wiring."""
|
|
|
|
import asyncio
|
|
import time
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.config.schema import AgentDefaults
|
|
|
|
_MAX_TOOL_RESULT_CHARS = AgentDefaults().max_tool_result_chars
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subagent_exec_tool_receives_allowed_env_keys(tmp_path):
|
|
"""allowed_env_keys from ExecToolConfig must be forwarded to the subagent's ExecTool."""
|
|
from nanobot.agent.subagent import SubagentManager, SubagentStatus
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.config.schema import ExecToolConfig
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
exec_config=ExecToolConfig(allowed_env_keys=["GOPATH", "JAVA_HOME"]),
|
|
)
|
|
mgr._announce_result = AsyncMock()
|
|
|
|
async def fake_run(spec):
|
|
exec_tool = spec.tools.get("exec")
|
|
assert exec_tool is not None
|
|
assert exec_tool.allowed_env_keys == ["GOPATH", "JAVA_HOME"]
|
|
return SimpleNamespace(
|
|
stop_reason="done",
|
|
final_content="done",
|
|
error=None,
|
|
tool_events=[],
|
|
)
|
|
|
|
mgr.runner.run = AsyncMock(side_effect=fake_run)
|
|
|
|
status = SubagentStatus(
|
|
task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic()
|
|
)
|
|
await mgr._run_subagent(
|
|
"sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status
|
|
)
|
|
|
|
mgr.runner.run.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subagent_uses_configured_max_iterations(tmp_path):
|
|
"""Subagents should honor the configured tool-iteration limit."""
|
|
from nanobot.agent.subagent import SubagentManager, SubagentStatus
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
max_iterations=37,
|
|
)
|
|
mgr._announce_result = AsyncMock()
|
|
|
|
async def fake_run(spec):
|
|
assert spec.max_iterations == 37
|
|
return SimpleNamespace(
|
|
stop_reason="done",
|
|
final_content="done",
|
|
error=None,
|
|
tool_events=[],
|
|
)
|
|
|
|
mgr.runner.run = AsyncMock(side_effect=fake_run)
|
|
|
|
status = SubagentStatus(
|
|
task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic()
|
|
)
|
|
await mgr._run_subagent(
|
|
"sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status
|
|
)
|
|
|
|
mgr.runner.run.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_spawn_tool_rejects_when_at_concurrency_limit(tmp_path):
|
|
"""SpawnTool should return an error string when the concurrency limit is reached."""
|
|
from nanobot.agent.subagent import SubagentManager
|
|
from nanobot.agent.tools.spawn import SpawnTool
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
)
|
|
mgr._announce_result = AsyncMock()
|
|
|
|
# Block the first subagent so it stays "running"
|
|
release = asyncio.Event()
|
|
|
|
async def fake_run(spec):
|
|
await release.wait()
|
|
return SimpleNamespace(
|
|
stop_reason="done",
|
|
final_content="done",
|
|
error=None,
|
|
tool_events=[],
|
|
)
|
|
|
|
mgr.runner.run = AsyncMock(side_effect=fake_run)
|
|
|
|
tool = SpawnTool(mgr)
|
|
tool.set_context("test", "c1", "test:c1")
|
|
|
|
# First spawn succeeds
|
|
result = await tool.execute(task="first task")
|
|
assert "started" in result
|
|
|
|
# Second spawn should be rejected (default limit is 1)
|
|
result = await tool.execute(task="second task")
|
|
assert "Cannot spawn subagent" in result
|
|
assert "concurrency limit reached" in result
|
|
|
|
# Release the first subagent
|
|
release.set()
|
|
# Allow cleanup
|
|
await asyncio.gather(*mgr._running_tasks.values(), return_exceptions=True)
|
|
|
|
|
|
def test_subagent_default_max_concurrent_matches_agent_defaults(tmp_path):
|
|
"""Direct SubagentManager construction should use the agent default concurrency limit."""
|
|
from nanobot.agent.subagent import SubagentManager
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
)
|
|
|
|
assert mgr.max_concurrent_subagents == AgentDefaults().max_concurrent_subagents
|
|
|
|
|
|
def test_subagent_default_max_iterations_matches_agent_defaults(tmp_path):
|
|
"""Direct SubagentManager construction should use the agent default limit."""
|
|
from nanobot.agent.subagent import SubagentManager
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
)
|
|
|
|
assert mgr.max_iterations == AgentDefaults().max_tool_iterations
|
|
|
|
|
|
def test_agent_loop_passes_max_iterations_to_subagents(tmp_path):
|
|
"""AgentLoop's configured limit should be shared with spawned subagents."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(
|
|
bus=bus,
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
model="test-model",
|
|
max_iterations=42,
|
|
)
|
|
|
|
assert loop.subagents.max_iterations == 42
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_agent_loop_syncs_updated_max_iterations_before_run(tmp_path):
|
|
"""Runtime max_iterations changes should be reflected before tool execution."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(
|
|
bus=bus,
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
model="test-model",
|
|
max_iterations=42,
|
|
)
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
|
|
async def fake_run(spec):
|
|
assert spec.max_iterations == 55
|
|
assert loop.subagents.max_iterations == 55
|
|
return SimpleNamespace(
|
|
stop_reason="done",
|
|
final_content="done",
|
|
error=None,
|
|
tool_events=[],
|
|
messages=[],
|
|
usage={},
|
|
had_injections=False,
|
|
tools_used=[],
|
|
)
|
|
|
|
loop.runner.run = AsyncMock(side_effect=fake_run)
|
|
loop.max_iterations = 55
|
|
|
|
await loop._run_agent_loop([])
|
|
|
|
loop.runner.run.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_pending_blocks_while_subagents_running(tmp_path):
|
|
"""_drain_pending should block when no messages are available but sub-agents are still running."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.events import InboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.session.manager import Session
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
|
|
|
|
pending_queue: asyncio.Queue[InboundMessage] = asyncio.Queue()
|
|
session = Session(key="test:drain-block")
|
|
injection_callback = None
|
|
|
|
# Capture the injection_callback that _run_agent_loop creates
|
|
async def fake_runner_run(spec):
|
|
nonlocal injection_callback
|
|
injection_callback = spec.injection_callback
|
|
|
|
# Simulate: first call to injection_callback should block because
|
|
# sub-agents are running and no messages are in the queue yet.
|
|
# We'll resolve this from a concurrent task.
|
|
return SimpleNamespace(
|
|
stop_reason="done",
|
|
final_content="done",
|
|
error=None,
|
|
tool_events=[],
|
|
messages=[],
|
|
usage={},
|
|
had_injections=False,
|
|
tools_used=[],
|
|
)
|
|
|
|
loop.runner.run = AsyncMock(side_effect=fake_runner_run)
|
|
|
|
# Register a running sub-agent in the SubagentManager for this session
|
|
async def _hang_forever():
|
|
await asyncio.Event().wait()
|
|
|
|
hang_task = asyncio.create_task(_hang_forever())
|
|
loop.subagents._session_tasks.setdefault(session.key, set()).add("sub-drain-1")
|
|
loop.subagents._running_tasks["sub-drain-1"] = hang_task
|
|
|
|
# Run _run_agent_loop — this defines the _drain_pending closure
|
|
await loop._run_agent_loop(
|
|
[{"role": "user", "content": "test"}],
|
|
session=session,
|
|
channel="test",
|
|
chat_id="c1",
|
|
pending_queue=pending_queue,
|
|
)
|
|
|
|
assert injection_callback is not None
|
|
|
|
# Now test the callback directly
|
|
# With sub-agents running and an empty queue, it should block
|
|
drain_task = asyncio.create_task(injection_callback())
|
|
|
|
# Give it a moment to enter the blocking wait
|
|
await asyncio.sleep(0.05)
|
|
|
|
# Should still be running (blocked on pending_queue.get())
|
|
assert not drain_task.done(), "drain should block while sub-agents are running"
|
|
|
|
# Now put a message in the queue (simulating sub-agent completion)
|
|
await pending_queue.put(InboundMessage(
|
|
sender_id="subagent",
|
|
channel="test",
|
|
chat_id="c1",
|
|
content="Sub-agent result",
|
|
media=None,
|
|
metadata={},
|
|
))
|
|
|
|
# Should unblock and return results
|
|
results = await asyncio.wait_for(drain_task, timeout=2.0)
|
|
assert len(results) >= 1
|
|
assert results[0]["role"] == "user"
|
|
assert "Sub-agent result" in str(results[0]["content"])
|
|
|
|
# Cleanup
|
|
hang_task.cancel()
|
|
try:
|
|
await hang_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_pending_no_block_when_no_subagents(tmp_path):
|
|
"""_drain_pending should not block when no sub-agents are running."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
|
|
|
|
pending_queue: asyncio.Queue = asyncio.Queue()
|
|
injection_callback = None
|
|
|
|
async def fake_runner_run(spec):
|
|
nonlocal injection_callback
|
|
injection_callback = spec.injection_callback
|
|
return SimpleNamespace(
|
|
stop_reason="done",
|
|
final_content="done",
|
|
error=None,
|
|
tool_events=[],
|
|
messages=[],
|
|
usage={},
|
|
had_injections=False,
|
|
tools_used=[],
|
|
)
|
|
|
|
loop.runner.run = AsyncMock(side_effect=fake_runner_run)
|
|
|
|
await loop._run_agent_loop(
|
|
[{"role": "user", "content": "test"}],
|
|
session=None,
|
|
channel="test",
|
|
chat_id="c1",
|
|
pending_queue=pending_queue,
|
|
)
|
|
|
|
assert injection_callback is not None
|
|
|
|
# With no sub-agents and empty queue, should return immediately
|
|
results = await asyncio.wait_for(injection_callback(), timeout=1.0)
|
|
assert results == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_pending_timeout(tmp_path):
|
|
"""_drain_pending should return empty after timeout when sub-agents hang."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.session.manager import Session
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
|
|
|
|
pending_queue: asyncio.Queue = asyncio.Queue()
|
|
session = Session(key="test:drain-timeout")
|
|
injection_callback = None
|
|
|
|
async def fake_runner_run(spec):
|
|
nonlocal injection_callback
|
|
injection_callback = spec.injection_callback
|
|
return SimpleNamespace(
|
|
stop_reason="done",
|
|
final_content="done",
|
|
error=None,
|
|
tool_events=[],
|
|
messages=[],
|
|
usage={},
|
|
had_injections=False,
|
|
tools_used=[],
|
|
)
|
|
|
|
loop.runner.run = AsyncMock(side_effect=fake_runner_run)
|
|
|
|
# Register a "running" sub-agent that will never complete
|
|
async def _hang_forever():
|
|
await asyncio.Event().wait()
|
|
|
|
hang_task = asyncio.create_task(_hang_forever())
|
|
loop.subagents._session_tasks.setdefault(session.key, set()).add("sub-timeout-1")
|
|
loop.subagents._running_tasks["sub-timeout-1"] = hang_task
|
|
|
|
await loop._run_agent_loop(
|
|
[{"role": "user", "content": "test"}],
|
|
session=session,
|
|
channel="test",
|
|
chat_id="c1",
|
|
pending_queue=pending_queue,
|
|
)
|
|
|
|
assert injection_callback is not None
|
|
|
|
# Patch the timeout to be very short for testing
|
|
with patch("nanobot.agent.loop.asyncio.wait_for") as mock_wait:
|
|
mock_wait.side_effect = asyncio.TimeoutError
|
|
results = await injection_callback()
|
|
assert results == []
|
|
|
|
# Cleanup
|
|
hang_task.cancel()
|
|
try:
|
|
await hang_task
|
|
except asyncio.CancelledError:
|
|
pass
|