diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 25af137c8..4dcecfc58 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -423,15 +423,18 @@ class AgentLoop: self._set_runtime_checkpoint(session, payload) async def _drain_pending(*, limit: int = _MAX_INJECTIONS_PER_TURN) -> list[dict[str, Any]]: - """Non-blocking drain of follow-up messages from the pending queue.""" + """Drain follow-up messages from the pending queue. + + When no messages are immediately available but sub-agents + spawned in this dispatch are still running, blocks until at + least one result arrives (or timeout). This keeps the runner + loop alive so subsequent sub-agent completions are consumed + in-order rather than dispatched separately. + """ if pending_queue is None: return [] - items: list[dict[str, Any]] = [] - while len(items) < limit: - try: - pending_msg = pending_queue.get_nowait() - except asyncio.QueueEmpty: - break + + def _to_user_message(pending_msg: InboundMessage) -> dict[str, Any]: content = pending_msg.content media = pending_msg.media if pending_msg.media else None if media: @@ -447,7 +450,36 @@ class AgentLoop: merged: str | list[dict[str, Any]] = f"{runtime_ctx}\n\n{user_content}" else: merged = [{"type": "text", "text": runtime_ctx}] + user_content - items.append({"role": "user", "content": merged}) + return {"role": "user", "content": merged} + + items: list[dict[str, Any]] = [] + while len(items) < limit: + try: + items.append(_to_user_message(pending_queue.get_nowait())) + except asyncio.QueueEmpty: + break + + # Block if nothing drained but sub-agents spawned in this dispatch + # are still running. Keeps the runner loop alive so subsequent + # completions are injected in-order rather than dispatched separately. + if (not items + and session is not None + and self.subagents.get_running_count_by_session(session.key) > 0): + try: + msg = await asyncio.wait_for(pending_queue.get(), timeout=300) + except asyncio.TimeoutError: + logger.warning( + "Timeout waiting for sub-agent completion in session {}", + session.key, + ) + return items + items.append(_to_user_message(msg)) + while len(items) < limit: + try: + items.append(_to_user_message(pending_queue.get_nowait())) + except asyncio.QueueEmpty: + break + return items result = await self.runner.run(AgentRunSpec( @@ -744,6 +776,7 @@ class AgentLoop: final_content, _, all_msgs, _, _ = await self._run_agent_loop( messages, session=session, channel=channel, chat_id=chat_id, message_id=msg.metadata.get("message_id"), + pending_queue=pending_queue, ) self._save_turn(session, all_msgs, 1 + len(history)) self._clear_runtime_checkpoint(session) diff --git a/tests/agent/tools/test_subagent_tools.py b/tests/agent/tools/test_subagent_tools.py index 045d5a451..bfe845395 100644 --- a/tests/agent/tools/test_subagent_tools.py +++ b/tests/agent/tools/test_subagent_tools.py @@ -1,8 +1,9 @@ """Tests for subagent tool registration and wiring.""" +import asyncio import time from types import SimpleNamespace -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -51,3 +52,209 @@ async def test_subagent_exec_tool_receives_allowed_env_keys(tmp_path): ) mgr.runner.run.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_drain_pending_blocks_while_subagents_running(tmp_path): + """_drain_pending should block when no messages are available but sub-agents are still running.""" + from nanobot.agent.loop import AgentLoop + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.session.manager import Session + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + + loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model") + + pending_queue: asyncio.Queue[InboundMessage] = asyncio.Queue() + session = Session(key="test:drain-block") + injection_callback = None + + # Capture the injection_callback that _run_agent_loop creates + original_run = loop.runner.run + + async def fake_runner_run(spec): + nonlocal injection_callback + injection_callback = spec.injection_callback + + # Simulate: first call to injection_callback should block because + # sub-agents are running and no messages are in the queue yet. + # We'll resolve this from a concurrent task. + return SimpleNamespace( + stop_reason="done", + final_content="done", + error=None, + tool_events=[], + messages=[], + usage={}, + had_injections=False, + tools_used=[], + ) + + loop.runner.run = AsyncMock(side_effect=fake_runner_run) + + # Register a running sub-agent in the SubagentManager for this session + async def _hang_forever(): + await asyncio.Event().wait() + + hang_task = asyncio.create_task(_hang_forever()) + loop.subagents._session_tasks.setdefault(session.key, set()).add("sub-drain-1") + loop.subagents._running_tasks["sub-drain-1"] = hang_task + + # Run _run_agent_loop — this defines the _drain_pending closure + await loop._run_agent_loop( + [{"role": "user", "content": "test"}], + session=session, + channel="test", + chat_id="c1", + pending_queue=pending_queue, + ) + + assert injection_callback is not None + + # Now test the callback directly + # With sub-agents running and an empty queue, it should block + drain_task = asyncio.create_task(injection_callback()) + + # Give it a moment to enter the blocking wait + await asyncio.sleep(0.05) + + # Should still be running (blocked on pending_queue.get()) + assert not drain_task.done(), "drain should block while sub-agents are running" + + # Now put a message in the queue (simulating sub-agent completion) + await pending_queue.put(InboundMessage( + sender_id="subagent", + channel="test", + chat_id="c1", + content="Sub-agent result", + media=None, + metadata={}, + )) + + # Should unblock and return results + results = await asyncio.wait_for(drain_task, timeout=2.0) + assert len(results) >= 1 + assert results[0]["role"] == "user" + assert "Sub-agent result" in str(results[0]["content"]) + + # Cleanup + hang_task.cancel() + try: + await hang_task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +async def test_drain_pending_no_block_when_no_subagents(tmp_path): + """_drain_pending should not block when no sub-agents are running.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + + loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model") + + pending_queue: asyncio.Queue = asyncio.Queue() + injection_callback = None + + async def fake_runner_run(spec): + nonlocal injection_callback + injection_callback = spec.injection_callback + return SimpleNamespace( + stop_reason="done", + final_content="done", + error=None, + tool_events=[], + messages=[], + usage={}, + had_injections=False, + tools_used=[], + ) + + loop.runner.run = AsyncMock(side_effect=fake_runner_run) + + await loop._run_agent_loop( + [{"role": "user", "content": "test"}], + session=None, + channel="test", + chat_id="c1", + pending_queue=pending_queue, + ) + + assert injection_callback is not None + + # With no sub-agents and empty queue, should return immediately + results = await asyncio.wait_for(injection_callback(), timeout=1.0) + assert results == [] + + +@pytest.mark.asyncio +async def test_drain_pending_timeout(tmp_path): + """_drain_pending should return empty after timeout when sub-agents hang.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.queue import MessageBus + from nanobot.session.manager import Session + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + + loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model") + + pending_queue: asyncio.Queue = asyncio.Queue() + session = Session(key="test:drain-timeout") + injection_callback = None + + async def fake_runner_run(spec): + nonlocal injection_callback + injection_callback = spec.injection_callback + return SimpleNamespace( + stop_reason="done", + final_content="done", + error=None, + tool_events=[], + messages=[], + usage={}, + had_injections=False, + tools_used=[], + ) + + loop.runner.run = AsyncMock(side_effect=fake_runner_run) + + # Register a "running" sub-agent that will never complete + async def _hang_forever(): + await asyncio.Event().wait() + + hang_task = asyncio.create_task(_hang_forever()) + loop.subagents._session_tasks.setdefault(session.key, set()).add("sub-timeout-1") + loop.subagents._running_tasks["sub-timeout-1"] = hang_task + + await loop._run_agent_loop( + [{"role": "user", "content": "test"}], + session=session, + channel="test", + chat_id="c1", + pending_queue=pending_queue, + ) + + assert injection_callback is not None + + # Patch the timeout to be very short for testing + with patch("nanobot.agent.loop.asyncio.wait_for") as mock_wait: + mock_wait.side_effect = asyncio.TimeoutError + results = await injection_callback() + assert results == [] + + # Cleanup + hang_task.cancel() + try: + await hang_task + except asyncio.CancelledError: + pass