diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index e71eb4834..adff97e7a 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -6,12 +6,12 @@ import time import uuid from dataclasses import dataclass, field from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any from loguru import logger from nanobot.agent.hook import AgentHook, AgentHookContext -from nanobot.agent.runner import AgentRunner, AgentRunSpec +from nanobot.agent.runner import AgentRunResult, AgentRunner, AgentRunSpec from nanobot.agent.tools.context import ToolContext from nanobot.agent.tools.file_state import FileStates from nanobot.agent.tools.loader import ToolLoader @@ -22,6 +22,9 @@ from nanobot.config.schema import AgentDefaults, ToolsConfig from nanobot.providers.base import LLMProvider from nanobot.utils.prompt_templates import render_template +if TYPE_CHECKING: + from nanobot.agent.tools.base import Tool + @dataclass(slots=True) class SubagentStatus: @@ -124,6 +127,38 @@ class SubagentManager: self.model = model self.runner.provider = provider + async def run_step( + self, + system_prompt: str, + user_message: str, + extra_tools: list["Tool"] | None = None, + ) -> AgentRunResult: + """Run a single subagent step and return the result directly. + + Unlike ``spawn``, this awaits completion and returns the + ``AgentRunResult`` — no message-bus announcement. + """ + tools = self._build_tools() + for t in (extra_tools or []): + tools.register(t) + # Deliberately lower than _run_subagent()'s 15: long-task steps must + # be short to encourage handoff() calls instead of doing everything. + return await self.runner.run(AgentRunSpec( + initial_messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_message}, + ], + tools=tools, + model=self.model, + max_iterations=8, + max_iterations_message=( + "Tool budget exhausted. " + "Call handoff() or complete() earlier next time." + ), + max_tool_result_chars=self.max_tool_result_chars, + fail_on_tool_error=False, + )) + async def spawn( self, task: str, diff --git a/nanobot/agent/tools/long_task.py b/nanobot/agent/tools/long_task.py new file mode 100644 index 000000000..cc5d8c786 --- /dev/null +++ b/nanobot/agent/tools/long_task.py @@ -0,0 +1,214 @@ +"""Long Task Tool: meta-ReAct loop for long-running tasks via subagent steps.""" + +from __future__ import annotations + +from typing import Any, TYPE_CHECKING + +from loguru import logger + +from nanobot.agent.tools.base import Tool, tool_parameters +from nanobot.agent.tools.schema import StringSchema, IntegerSchema, tool_parameters_schema + +if TYPE_CHECKING: + from nanobot.agent.subagent import SubagentManager + from nanobot.agent.tools.context import ToolContext + + +# --------------------------------------------------------------------------- +# Signal tools -- write progress/completion into a shared dict +# --------------------------------------------------------------------------- + +@tool_parameters( + tool_parameters_schema( + message=StringSchema( + "What you completed in this step and where results are saved. " + "The next step will pick up from here.", + ), + required=["message"], + ) +) +class HandoffTool(Tool): + """Signal that the step is done but the overall task continues.""" + + def __init__(self, store: dict[str, str]) -> None: + self._store = store + + @property + def name(self) -> str: + return "handoff" + + @property + def description(self) -> str: + return ( + "You are done with this step. Pass control to the next step. " + "You MUST call this (or complete()) before your tool budget runs out." + ) + + async def execute(self, message: str, **kwargs: Any) -> str: + self._store["type"] = "handoff" + self._store["payload"] = message + return "Progress recorded. The next step will continue from here." + + +@tool_parameters( + tool_parameters_schema( + summary=StringSchema("Final result summary of the entire task"), + required=["summary"], + ) +) +class CompleteTool(Tool): + """Signal that the entire long task is finished.""" + + def __init__(self, store: dict[str, str]) -> None: + self._store = store + + @property + def name(self) -> str: + return "complete" + + @property + def description(self) -> str: + return ( + "The ENTIRE goal is achieved. Call this only when nothing remains." + ) + + async def execute(self, summary: str, **kwargs: Any) -> str: + self._store["type"] = "complete" + self._store["payload"] = summary + return "Task marked as complete." + + +# --------------------------------------------------------------------------- +# System prompt for long-task subagent steps +# --------------------------------------------------------------------------- + +_STEP_BUDGET = 8 + +# Must match max_iterations_message set in SubagentManager.run_step() +_BUDGET_EXHAUSTED_PREFIX = "Tool budget exhausted" + +_LONG_TASK_SYSTEM_PROMPT = """\ +You are one step in a chain. Do a small chunk of work, then call handoff(). + +1. Check the filesystem to see what's already done (ignore handoff notes). +2. Do the next small piece of work. +3. Call handoff() with what you did and where results are saved. \ +If everything is truly done, call complete() instead. + +You have very few tool calls. Do NOT try to finish everything. \ +Do one chunk, call handoff(), done. +""" + + +def _build_user_message(goal: str, step: int, handoff: str) -> str: + """Build the user message for a subagent step with budget warning.""" + budget_note = ( + f"\n\n---\n" + f"Step {step + 1}. You have {_STEP_BUDGET} tool calls total. " + f"Reserve the last 1-2 calls for handoff() or complete(). " + f"If you run out of calls without calling one, your progress is LOST." + ) + if step == 0: + return goal + budget_note + return f"{goal}\n\n## Previous Progress\n{handoff}{budget_note}" + + +def _extract_handoff_from_messages(messages: list[dict[str, Any]]) -> str: + """Extract useful content from messages when no signal was called. + + Skips the generic max_iterations_message appended by the runner, + looking for actual subagent thinking/progress text instead. + """ + for msg in reversed(messages): + if msg.get("role") != "assistant": + continue + content = (msg.get("content") or "").strip() + if not content: + continue + if content.startswith(_BUDGET_EXHAUSTED_PREFIX): + continue + return content + return "" + + +# --------------------------------------------------------------------------- +# Long Task Tool — the orchestrator +# --------------------------------------------------------------------------- + +@tool_parameters( + tool_parameters_schema( + goal=StringSchema("Description of the task to complete"), + max_steps=IntegerSchema( + description="Maximum number of subagent steps (default 20)", + minimum=1, + maximum=100, + ), + required=["goal"], + ) +) +class LongTaskTool(Tool): + """Execute a long-running task via a meta-ReAct loop of subagent steps.""" + + def __init__(self, manager: SubagentManager) -> None: + self._manager = manager + + @classmethod + def enabled(cls, ctx: ToolContext) -> bool: + return ctx.subagent_manager is not None + + @classmethod + def create(cls, ctx: ToolContext) -> Tool: + return cls(manager=ctx.subagent_manager) + + @property + def name(self) -> str: + return "long_task" + + @property + def description(self) -> str: + return ( + "Execute a long-running task that cannot fit in a single context window. " + "The work is broken into sequential steps, each starting fresh with the " + "original goal and progress from the previous step. Use this for batch " + "processing (auditing many files, processing many items), large-scale " + "refactoring, or any multi-step task where you might lose track of the " + "goal. For simple independent tasks, use spawn instead." + ) + + async def execute(self, goal: str, max_steps: int = 20, **kwargs: Any) -> str: + handoff = "" + for step in range(max_steps): + signal_store: dict[str, str] = {} + user_msg = _build_user_message(goal, step, handoff) + try: + result = await self._manager.run_step( + system_prompt=_LONG_TASK_SYSTEM_PROMPT, + user_message=user_msg, + extra_tools=[HandoffTool(signal_store), CompleteTool(signal_store)], + ) + except Exception: + logger.exception("long_task step {}/{} failed", step + 1, max_steps) + if handoff: + return ( + f"Long task failed at step {step + 1}/{max_steps}. " + f"Last progress:\n{handoff}" + ) + return f"Long task failed at step {step + 1}/{max_steps}." + sig_type = signal_store.get("type") + logger.info( + "long_task step {}/{}: signal={}, stop_reason={}, tools={}", + step + 1, max_steps, sig_type or "none", + result.stop_reason, + result.tools_used, + ) + if sig_type == "complete": + return signal_store["payload"] + elif sig_type == "handoff": + handoff = signal_store["payload"] + else: + # No signal tool called — extract useful content as fallback + handoff = _extract_handoff_from_messages(result.messages) + return ( + f"Long task reached max steps ({max_steps}). " + f"Last progress:\n{handoff}" + ) diff --git a/tests/agent/tools/test_long_task.py b/tests/agent/tools/test_long_task.py new file mode 100644 index 000000000..e08249251 --- /dev/null +++ b/tests/agent/tools/test_long_task.py @@ -0,0 +1,262 @@ +"""Tests for Long Task Tool: HandoffTool, CompleteTool, LongTaskTool.""" + +import pytest +from types import SimpleNamespace + +from unittest.mock import AsyncMock, MagicMock + + +@pytest.mark.asyncio +async def test_handoff_tool_stores_signal(): + from nanobot.agent.tools.long_task import HandoffTool + + store: dict[str, str] = {} + tool = HandoffTool(store) + result = await tool.execute(message="Processed items 1-8. Results in out.md. Continue with item 9.") + assert result == "Progress recorded. The next step will continue from here." + assert store["type"] == "handoff" + assert store["payload"] == "Processed items 1-8. Results in out.md. Continue with item 9." + + +@pytest.mark.asyncio +async def test_complete_tool_stores_signal(): + from nanobot.agent.tools.long_task import CompleteTool + + store: dict[str, str] = {} + tool = CompleteTool(store) + result = await tool.execute(summary="All 100 items processed. Summary in report.md") + assert result == "Task marked as complete." + assert store["type"] == "complete" + assert store["payload"] == "All 100 items processed. Summary in report.md" + + +@pytest.mark.asyncio +async def test_signal_tools_overwrite_on_multiple_calls(): + """Last call wins -- the orchestrator only reads the final signal.""" + from nanobot.agent.tools.long_task import HandoffTool, CompleteTool + + store: dict[str, str] = {} + handoff = HandoffTool(store) + complete = CompleteTool(store) + await handoff.execute(message="first progress") + assert store["type"] == "handoff" + await complete.execute(summary="done early") + assert store["type"] == "complete" + assert store["payload"] == "done early" + + +# --------------------------------------------------------------------------- +# Helper: minimal SubagentManager stub +# --------------------------------------------------------------------------- + +def _make_manager_stub(): + """Create a minimal SubagentManager stub with a mockable run_step.""" + mgr = MagicMock() + mgr.run_step = AsyncMock() + return mgr + + +def _step_result(**overrides): + """Create a minimal AgentRunResult-like namespace.""" + defaults = dict( + final_content="step done", + messages=[], + tool_events=[], + stop_reason="completed", + tools_used=[], + ) + defaults.update(overrides) + return SimpleNamespace(**defaults) + + +# --------------------------------------------------------------------------- +# LongTaskTool orchestrator tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_long_task_completes_in_one_step(): + """Subagent calls complete() immediately.""" + from nanobot.agent.tools.long_task import LongTaskTool + + mgr = _make_manager_stub() + + async def fake_run_step(*, system_prompt, user_message, extra_tools): + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="All done. Report in summary.md") + return _step_result( + final_content="All done.", + tools_used=["complete"], + ) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + result = await tool.execute(goal="Audit all issues.") + assert result == "All done. Report in summary.md" + + +@pytest.mark.asyncio +async def test_long_task_completes_after_multiple_handoffs(): + """Subagent calls handoff() twice then complete().""" + from nanobot.agent.tools.long_task import LongTaskTool + + mgr = _make_manager_stub() + call_count = 0 + + async def fake_run_step(*, system_prompt, user_message, extra_tools): + nonlocal call_count + call_count += 1 + if call_count == 1: + for t in extra_tools: + if t.name == "handoff": + await t.execute(message="Processed 1-8.") + elif call_count == 2: + assert "Processed 1-8." in user_message + assert "8 tool calls" in user_message + for t in extra_tools: + if t.name == "handoff": + await t.execute(message="Processed 9-16.") + else: + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="All 16 items audited.") + return _step_result(tools_used=["handoff"]) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + result = await tool.execute(goal="Audit 16 issues.") + assert result == "All 16 items audited." + assert call_count == 3 + + +@pytest.mark.asyncio +async def test_long_task_fallback_when_no_signal_called(): + """Subagent doesn't call handoff/complete — extract progress from messages.""" + from nanobot.agent.tools.long_task import LongTaskTool + + mgr = _make_manager_stub() + + async def fake_run_step(*, system_prompt, user_message, extra_tools): + return _step_result( + final_content="Tool budget exhausted.", + messages=[ + {"role": "system", "content": "..."}, + {"role": "user", "content": "..."}, + {"role": "assistant", "content": "I processed items 1-5. Results in out.md."}, + {"role": "tool", "content": "ok"}, + {"role": "assistant", "content": "Tool budget exhausted. Call handoff() earlier next time."}, + ], + stop_reason="max_iterations", + ) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + result = await tool.execute(goal="Do something.", max_steps=2) + # Should reach max_steps and return the fallback extracted from messages + assert "max steps (2)" in result + assert "I processed items 1-5" in result + + +@pytest.mark.asyncio +async def test_long_task_goal_appears_in_system_prompt(): + """Verify every step's system_prompt contains the long task system prompt.""" + from nanobot.agent.tools.long_task import LongTaskTool + + mgr = _make_manager_stub() + captured_prompts = [] + + async def fake_run_step(*, system_prompt, user_message, extra_tools): + captured_prompts.append(system_prompt) + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="done") + return _step_result(final_content="done") + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + await tool.execute(goal="Audit everything.") + assert len(captured_prompts) == 1 + assert "handoff()" in captured_prompts[0] + assert "complete()" in captured_prompts[0] + assert "filesystem" in captured_prompts[0] + + +# --------------------------------------------------------------------------- +# Helper function tests +# --------------------------------------------------------------------------- + + +def test_build_user_message_step_0(): + from nanobot.agent.tools.long_task import _build_user_message + + msg = _build_user_message("Audit all issues.", step=0, handoff="") + assert msg.startswith("Audit all issues.") + assert "Step 1" in msg + assert "8 tool calls" in msg + assert "Previous Progress" not in msg + + +def test_build_user_message_later_step(): + from nanobot.agent.tools.long_task import _build_user_message + + msg = _build_user_message("Audit all issues.", step=3, handoff="Did 1-10.") + assert "Audit all issues." in msg + assert "Previous Progress" in msg + assert "Did 1-10." in msg + assert "Step 4" in msg + assert "8 tool calls" in msg + + +def test_extract_handoff_from_messages(): + from nanobot.agent.tools.long_task import _extract_handoff_from_messages + + messages = [ + {"role": "system", "content": "sys"}, + {"role": "user", "content": "do it"}, + {"role": "assistant", "content": ""}, + {"role": "tool", "content": "result"}, + {"role": "assistant", "content": "I processed items 1-3."}, + ] + assert _extract_handoff_from_messages(messages) == "I processed items 1-3." + + +def test_extract_handoff_skips_budget_message(): + from nanobot.agent.tools.long_task import _extract_handoff_from_messages + + messages = [ + {"role": "system", "content": "sys"}, + {"role": "user", "content": "do it"}, + {"role": "assistant", "content": "I processed items 1-3."}, + {"role": "tool", "content": "result"}, + {"role": "assistant", "content": "Tool budget exhausted. Call handoff() earlier."}, + ] + # Should skip the budget message and find the actual progress + assert _extract_handoff_from_messages(messages) == "I processed items 1-3." + + +def test_extract_handoff_from_empty_messages(): + from nanobot.agent.tools.long_task import _extract_handoff_from_messages + + assert _extract_handoff_from_messages([]) == "" + assert _extract_handoff_from_messages([{"role": "system", "content": "sys"}]) == "" + + +# --------------------------------------------------------------------------- +# Integration: verify LongTaskTool is wired into the main agent loop +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_long_task_registered_in_tool_registry(tmp_path): + """Verify LongTaskTool appears in the main agent's tool registry.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model") + tool = loop.tools.get("long_task") + assert tool is not None + assert tool.name == "long_task" diff --git a/tests/agent/tools/test_subagent_tools.py b/tests/agent/tools/test_subagent_tools.py index c0ee8662e..05dc82c6e 100644 --- a/tests/agent/tools/test_subagent_tools.py +++ b/tests/agent/tools/test_subagent_tools.py @@ -337,6 +337,74 @@ async def test_drain_pending_blocks_while_subagents_running(tmp_path): pass +@pytest.mark.asyncio +async def test_run_step_returns_agent_run_result(tmp_path): + """run_step should execute a single subagent step and return AgentRunResult.""" + from nanobot.agent.subagent import SubagentManager + from nanobot.bus.queue import MessageBus + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + mgr = SubagentManager( + provider=provider, + workspace=tmp_path, + bus=bus, + max_tool_result_chars=5000, + ) + + fake_result = SimpleNamespace( + stop_reason="completed", + final_content="step result", + error=None, + tool_events=[], + messages=[], + usage={}, + had_injections=False, + tools_used=[], + ) + mgr.runner.run = AsyncMock(return_value=fake_result) + + # Create a dummy extra tool to verify injection + from nanobot.agent.tools.base import Tool, tool_parameters + from nanobot.agent.tools.schema import StringSchema, tool_parameters_schema + + @tool_parameters( + tool_parameters_schema( + msg=StringSchema("test"), + required=["msg"], + ) + ) + class DummySignalTool(Tool): + @property + def name(self): + return "dummy_signal" + @property + def description(self): + return "test signal" + async def execute(self, msg="", **kwargs): + return "ok" + + result = await mgr.run_step( + system_prompt="You are a test subagent.", + user_message="Do something.", + extra_tools=[DummySignalTool()], + ) + assert result.final_content == "step result" + + # Verify runner.run was called with correct spec + call_args = mgr.runner.run.call_args + spec = call_args[0][0] + assert spec.tools.has("dummy_signal") + assert spec.tools.has("read_file") + assert spec.fail_on_tool_error is False + # Verify system prompt and user message + assert spec.initial_messages[0]["role"] == "system" + assert spec.initial_messages[0]["content"] == "You are a test subagent." + assert spec.initial_messages[1]["role"] == "user" + assert spec.initial_messages[1]["content"] == "Do something." + + @pytest.mark.asyncio async def test_drain_pending_no_block_when_no_subagents(tmp_path): """_drain_pending should not block when no sub-agents are running."""