fix(agent): prevent duplicate responses when sub-agents complete concurrently

When the main agent spawns multiple sub-agents, each completion
independently triggered a new _dispatch, causing 3-4 user-visible
responses instead of a single comprehensive report.

- Extend _drain_pending to block-wait on pending_queue when sub-agents
  are still running, keeping the runner loop alive for in-order injection
- Pass pending_queue in the system message path so subsequent sub-agent
  results can still be injected mid-turn via a new dispatch
This commit is contained in:
chengyongru 2026-04-22 18:20:36 +08:00
parent 9bf7f3b420
commit da0ebc64fb
2 changed files with 249 additions and 9 deletions

View File

@ -423,15 +423,18 @@ class AgentLoop:
self._set_runtime_checkpoint(session, payload)
async def _drain_pending(*, limit: int = _MAX_INJECTIONS_PER_TURN) -> list[dict[str, Any]]:
"""Non-blocking drain of follow-up messages from the pending queue."""
"""Drain follow-up messages from the pending queue.
When no messages are immediately available but sub-agents
spawned in this dispatch are still running, blocks until at
least one result arrives (or timeout). This keeps the runner
loop alive so subsequent sub-agent completions are consumed
in-order rather than dispatched separately.
"""
if pending_queue is None:
return []
items: list[dict[str, Any]] = []
while len(items) < limit:
try:
pending_msg = pending_queue.get_nowait()
except asyncio.QueueEmpty:
break
def _to_user_message(pending_msg: InboundMessage) -> dict[str, Any]:
content = pending_msg.content
media = pending_msg.media if pending_msg.media else None
if media:
@ -447,7 +450,36 @@ class AgentLoop:
merged: str | list[dict[str, Any]] = f"{runtime_ctx}\n\n{user_content}"
else:
merged = [{"type": "text", "text": runtime_ctx}] + user_content
items.append({"role": "user", "content": merged})
return {"role": "user", "content": merged}
items: list[dict[str, Any]] = []
while len(items) < limit:
try:
items.append(_to_user_message(pending_queue.get_nowait()))
except asyncio.QueueEmpty:
break
# Block if nothing drained but sub-agents spawned in this dispatch
# are still running. Keeps the runner loop alive so subsequent
# completions are injected in-order rather than dispatched separately.
if (not items
and session is not None
and self.subagents.get_running_count_by_session(session.key) > 0):
try:
msg = await asyncio.wait_for(pending_queue.get(), timeout=300)
except asyncio.TimeoutError:
logger.warning(
"Timeout waiting for sub-agent completion in session {}",
session.key,
)
return items
items.append(_to_user_message(msg))
while len(items) < limit:
try:
items.append(_to_user_message(pending_queue.get_nowait()))
except asyncio.QueueEmpty:
break
return items
result = await self.runner.run(AgentRunSpec(
@ -744,6 +776,7 @@ class AgentLoop:
final_content, _, all_msgs, _, _ = await self._run_agent_loop(
messages, session=session, channel=channel, chat_id=chat_id,
message_id=msg.metadata.get("message_id"),
pending_queue=pending_queue,
)
self._save_turn(session, all_msgs, 1 + len(history))
self._clear_runtime_checkpoint(session)

View File

@ -1,8 +1,9 @@
"""Tests for subagent tool registration and wiring."""
import asyncio
import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@ -51,3 +52,209 @@ async def test_subagent_exec_tool_receives_allowed_env_keys(tmp_path):
)
mgr.runner.run.assert_awaited_once()
@pytest.mark.asyncio
async def test_drain_pending_blocks_while_subagents_running(tmp_path):
"""_drain_pending should block when no messages are available but sub-agents are still running."""
from nanobot.agent.loop import AgentLoop
from nanobot.agent.subagent import SubagentManager
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.session.manager import Session
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
pending_queue: asyncio.Queue[InboundMessage] = asyncio.Queue()
session = Session(key="test:drain-block")
injection_callback = None
# Capture the injection_callback that _run_agent_loop creates
original_run = loop.runner.run
async def fake_runner_run(spec):
nonlocal injection_callback
injection_callback = spec.injection_callback
# Simulate: first call to injection_callback should block because
# sub-agents are running and no messages are in the queue yet.
# We'll resolve this from a concurrent task.
return SimpleNamespace(
stop_reason="done",
final_content="done",
error=None,
tool_events=[],
messages=[],
usage={},
had_injections=False,
tools_used=[],
)
loop.runner.run = AsyncMock(side_effect=fake_runner_run)
# Register a running sub-agent in the SubagentManager for this session
async def _hang_forever():
await asyncio.Event().wait()
hang_task = asyncio.create_task(_hang_forever())
loop.subagents._session_tasks.setdefault(session.key, set()).add("sub-drain-1")
loop.subagents._running_tasks["sub-drain-1"] = hang_task
# Run _run_agent_loop — this defines the _drain_pending closure
await loop._run_agent_loop(
[{"role": "user", "content": "test"}],
session=session,
channel="test",
chat_id="c1",
pending_queue=pending_queue,
)
assert injection_callback is not None
# Now test the callback directly
# With sub-agents running and an empty queue, it should block
drain_task = asyncio.create_task(injection_callback())
# Give it a moment to enter the blocking wait
await asyncio.sleep(0.05)
# Should still be running (blocked on pending_queue.get())
assert not drain_task.done(), "drain should block while sub-agents are running"
# Now put a message in the queue (simulating sub-agent completion)
await pending_queue.put(InboundMessage(
sender_id="subagent",
channel="test",
chat_id="c1",
content="Sub-agent result",
media=None,
metadata={},
))
# Should unblock and return results
results = await asyncio.wait_for(drain_task, timeout=2.0)
assert len(results) >= 1
assert results[0]["role"] == "user"
assert "Sub-agent result" in str(results[0]["content"])
# Cleanup
hang_task.cancel()
try:
await hang_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_drain_pending_no_block_when_no_subagents(tmp_path):
"""_drain_pending should not block when no sub-agents are running."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
pending_queue: asyncio.Queue = asyncio.Queue()
injection_callback = None
async def fake_runner_run(spec):
nonlocal injection_callback
injection_callback = spec.injection_callback
return SimpleNamespace(
stop_reason="done",
final_content="done",
error=None,
tool_events=[],
messages=[],
usage={},
had_injections=False,
tools_used=[],
)
loop.runner.run = AsyncMock(side_effect=fake_runner_run)
await loop._run_agent_loop(
[{"role": "user", "content": "test"}],
session=None,
channel="test",
chat_id="c1",
pending_queue=pending_queue,
)
assert injection_callback is not None
# With no sub-agents and empty queue, should return immediately
results = await asyncio.wait_for(injection_callback(), timeout=1.0)
assert results == []
@pytest.mark.asyncio
async def test_drain_pending_timeout(tmp_path):
"""_drain_pending should return empty after timeout when sub-agents hang."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
from nanobot.session.manager import Session
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
pending_queue: asyncio.Queue = asyncio.Queue()
session = Session(key="test:drain-timeout")
injection_callback = None
async def fake_runner_run(spec):
nonlocal injection_callback
injection_callback = spec.injection_callback
return SimpleNamespace(
stop_reason="done",
final_content="done",
error=None,
tool_events=[],
messages=[],
usage={},
had_injections=False,
tools_used=[],
)
loop.runner.run = AsyncMock(side_effect=fake_runner_run)
# Register a "running" sub-agent that will never complete
async def _hang_forever():
await asyncio.Event().wait()
hang_task = asyncio.create_task(_hang_forever())
loop.subagents._session_tasks.setdefault(session.key, set()).add("sub-timeout-1")
loop.subagents._running_tasks["sub-timeout-1"] = hang_task
await loop._run_agent_loop(
[{"role": "user", "content": "test"}],
session=session,
channel="test",
chat_id="c1",
pending_queue=pending_queue,
)
assert injection_callback is not None
# Patch the timeout to be very short for testing
with patch("nanobot.agent.loop.asyncio.wait_for") as mock_wait:
mock_wait.side_effect = asyncio.TimeoutError
results = await injection_callback()
assert results == []
# Cleanup
hang_task.cancel()
try:
await hang_task
except asyncio.CancelledError:
pass