feat(long-task): add LongTaskTool for multi-step agent tasks

Implements a meta-ReAct loop where long-running tasks are broken into
sequential subagent steps, each starting fresh with the original goal
and progress from the previous step. This prevents context drift when
agents work on complex, multi-step tasks.

- Extract build_tool_registry() from SubagentManager for reuse
- Add run_step() for synchronous subagent execution (no bus announcement)
- Add HandoffTool and CompleteTool as signal mechanisms via shared dict
- Add LongTaskTool orchestrator with simplified prompt (8 iterations/step)
- Register LongTaskTool in main agent loop
- Add _extract_handoff_from_messages fallback for robustness
This commit is contained in:
chengyongru 2026-04-27 00:32:05 +08:00
parent ef268f47d2
commit bf5762a3d4
4 changed files with 581 additions and 2 deletions

View File

@ -6,12 +6,12 @@ import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any
from loguru import logger
from nanobot.agent.hook import AgentHook, AgentHookContext
from nanobot.agent.runner import AgentRunner, AgentRunSpec
from nanobot.agent.runner import AgentRunResult, AgentRunner, AgentRunSpec
from nanobot.agent.tools.context import ToolContext
from nanobot.agent.tools.file_state import FileStates
from nanobot.agent.tools.loader import ToolLoader
@ -22,6 +22,9 @@ from nanobot.config.schema import AgentDefaults, ToolsConfig
from nanobot.providers.base import LLMProvider
from nanobot.utils.prompt_templates import render_template
if TYPE_CHECKING:
from nanobot.agent.tools.base import Tool
@dataclass(slots=True)
class SubagentStatus:
@ -124,6 +127,38 @@ class SubagentManager:
self.model = model
self.runner.provider = provider
async def run_step(
self,
system_prompt: str,
user_message: str,
extra_tools: list["Tool"] | None = None,
) -> AgentRunResult:
"""Run a single subagent step and return the result directly.
Unlike ``spawn``, this awaits completion and returns the
``AgentRunResult`` no message-bus announcement.
"""
tools = self._build_tools()
for t in (extra_tools or []):
tools.register(t)
# Deliberately lower than _run_subagent()'s 15: long-task steps must
# be short to encourage handoff() calls instead of doing everything.
return await self.runner.run(AgentRunSpec(
initial_messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
tools=tools,
model=self.model,
max_iterations=8,
max_iterations_message=(
"Tool budget exhausted. "
"Call handoff() or complete() earlier next time."
),
max_tool_result_chars=self.max_tool_result_chars,
fail_on_tool_error=False,
))
async def spawn(
self,
task: str,

View File

@ -0,0 +1,214 @@
"""Long Task Tool: meta-ReAct loop for long-running tasks via subagent steps."""
from __future__ import annotations
from typing import Any, TYPE_CHECKING
from loguru import logger
from nanobot.agent.tools.base import Tool, tool_parameters
from nanobot.agent.tools.schema import StringSchema, IntegerSchema, tool_parameters_schema
if TYPE_CHECKING:
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.context import ToolContext
# ---------------------------------------------------------------------------
# Signal tools -- write progress/completion into a shared dict
# ---------------------------------------------------------------------------
@tool_parameters(
tool_parameters_schema(
message=StringSchema(
"What you completed in this step and where results are saved. "
"The next step will pick up from here.",
),
required=["message"],
)
)
class HandoffTool(Tool):
"""Signal that the step is done but the overall task continues."""
def __init__(self, store: dict[str, str]) -> None:
self._store = store
@property
def name(self) -> str:
return "handoff"
@property
def description(self) -> str:
return (
"You are done with this step. Pass control to the next step. "
"You MUST call this (or complete()) before your tool budget runs out."
)
async def execute(self, message: str, **kwargs: Any) -> str:
self._store["type"] = "handoff"
self._store["payload"] = message
return "Progress recorded. The next step will continue from here."
@tool_parameters(
tool_parameters_schema(
summary=StringSchema("Final result summary of the entire task"),
required=["summary"],
)
)
class CompleteTool(Tool):
"""Signal that the entire long task is finished."""
def __init__(self, store: dict[str, str]) -> None:
self._store = store
@property
def name(self) -> str:
return "complete"
@property
def description(self) -> str:
return (
"The ENTIRE goal is achieved. Call this only when nothing remains."
)
async def execute(self, summary: str, **kwargs: Any) -> str:
self._store["type"] = "complete"
self._store["payload"] = summary
return "Task marked as complete."
# ---------------------------------------------------------------------------
# System prompt for long-task subagent steps
# ---------------------------------------------------------------------------
_STEP_BUDGET = 8
# Must match max_iterations_message set in SubagentManager.run_step()
_BUDGET_EXHAUSTED_PREFIX = "Tool budget exhausted"
_LONG_TASK_SYSTEM_PROMPT = """\
You are one step in a chain. Do a small chunk of work, then call handoff().
1. Check the filesystem to see what's already done (ignore handoff notes).
2. Do the next small piece of work.
3. Call handoff() with what you did and where results are saved. \
If everything is truly done, call complete() instead.
You have very few tool calls. Do NOT try to finish everything. \
Do one chunk, call handoff(), done.
"""
def _build_user_message(goal: str, step: int, handoff: str) -> str:
"""Build the user message for a subagent step with budget warning."""
budget_note = (
f"\n\n---\n"
f"Step {step + 1}. You have {_STEP_BUDGET} tool calls total. "
f"Reserve the last 1-2 calls for handoff() or complete(). "
f"If you run out of calls without calling one, your progress is LOST."
)
if step == 0:
return goal + budget_note
return f"{goal}\n\n## Previous Progress\n{handoff}{budget_note}"
def _extract_handoff_from_messages(messages: list[dict[str, Any]]) -> str:
"""Extract useful content from messages when no signal was called.
Skips the generic max_iterations_message appended by the runner,
looking for actual subagent thinking/progress text instead.
"""
for msg in reversed(messages):
if msg.get("role") != "assistant":
continue
content = (msg.get("content") or "").strip()
if not content:
continue
if content.startswith(_BUDGET_EXHAUSTED_PREFIX):
continue
return content
return ""
# ---------------------------------------------------------------------------
# Long Task Tool — the orchestrator
# ---------------------------------------------------------------------------
@tool_parameters(
tool_parameters_schema(
goal=StringSchema("Description of the task to complete"),
max_steps=IntegerSchema(
description="Maximum number of subagent steps (default 20)",
minimum=1,
maximum=100,
),
required=["goal"],
)
)
class LongTaskTool(Tool):
"""Execute a long-running task via a meta-ReAct loop of subagent steps."""
def __init__(self, manager: SubagentManager) -> None:
self._manager = manager
@classmethod
def enabled(cls, ctx: ToolContext) -> bool:
return ctx.subagent_manager is not None
@classmethod
def create(cls, ctx: ToolContext) -> Tool:
return cls(manager=ctx.subagent_manager)
@property
def name(self) -> str:
return "long_task"
@property
def description(self) -> str:
return (
"Execute a long-running task that cannot fit in a single context window. "
"The work is broken into sequential steps, each starting fresh with the "
"original goal and progress from the previous step. Use this for batch "
"processing (auditing many files, processing many items), large-scale "
"refactoring, or any multi-step task where you might lose track of the "
"goal. For simple independent tasks, use spawn instead."
)
async def execute(self, goal: str, max_steps: int = 20, **kwargs: Any) -> str:
handoff = ""
for step in range(max_steps):
signal_store: dict[str, str] = {}
user_msg = _build_user_message(goal, step, handoff)
try:
result = await self._manager.run_step(
system_prompt=_LONG_TASK_SYSTEM_PROMPT,
user_message=user_msg,
extra_tools=[HandoffTool(signal_store), CompleteTool(signal_store)],
)
except Exception:
logger.exception("long_task step {}/{} failed", step + 1, max_steps)
if handoff:
return (
f"Long task failed at step {step + 1}/{max_steps}. "
f"Last progress:\n{handoff}"
)
return f"Long task failed at step {step + 1}/{max_steps}."
sig_type = signal_store.get("type")
logger.info(
"long_task step {}/{}: signal={}, stop_reason={}, tools={}",
step + 1, max_steps, sig_type or "none",
result.stop_reason,
result.tools_used,
)
if sig_type == "complete":
return signal_store["payload"]
elif sig_type == "handoff":
handoff = signal_store["payload"]
else:
# No signal tool called — extract useful content as fallback
handoff = _extract_handoff_from_messages(result.messages)
return (
f"Long task reached max steps ({max_steps}). "
f"Last progress:\n{handoff}"
)

View File

@ -0,0 +1,262 @@
"""Tests for Long Task Tool: HandoffTool, CompleteTool, LongTaskTool."""
import pytest
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_handoff_tool_stores_signal():
from nanobot.agent.tools.long_task import HandoffTool
store: dict[str, str] = {}
tool = HandoffTool(store)
result = await tool.execute(message="Processed items 1-8. Results in out.md. Continue with item 9.")
assert result == "Progress recorded. The next step will continue from here."
assert store["type"] == "handoff"
assert store["payload"] == "Processed items 1-8. Results in out.md. Continue with item 9."
@pytest.mark.asyncio
async def test_complete_tool_stores_signal():
from nanobot.agent.tools.long_task import CompleteTool
store: dict[str, str] = {}
tool = CompleteTool(store)
result = await tool.execute(summary="All 100 items processed. Summary in report.md")
assert result == "Task marked as complete."
assert store["type"] == "complete"
assert store["payload"] == "All 100 items processed. Summary in report.md"
@pytest.mark.asyncio
async def test_signal_tools_overwrite_on_multiple_calls():
"""Last call wins -- the orchestrator only reads the final signal."""
from nanobot.agent.tools.long_task import HandoffTool, CompleteTool
store: dict[str, str] = {}
handoff = HandoffTool(store)
complete = CompleteTool(store)
await handoff.execute(message="first progress")
assert store["type"] == "handoff"
await complete.execute(summary="done early")
assert store["type"] == "complete"
assert store["payload"] == "done early"
# ---------------------------------------------------------------------------
# Helper: minimal SubagentManager stub
# ---------------------------------------------------------------------------
def _make_manager_stub():
"""Create a minimal SubagentManager stub with a mockable run_step."""
mgr = MagicMock()
mgr.run_step = AsyncMock()
return mgr
def _step_result(**overrides):
"""Create a minimal AgentRunResult-like namespace."""
defaults = dict(
final_content="step done",
messages=[],
tool_events=[],
stop_reason="completed",
tools_used=[],
)
defaults.update(overrides)
return SimpleNamespace(**defaults)
# ---------------------------------------------------------------------------
# LongTaskTool orchestrator tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_long_task_completes_in_one_step():
"""Subagent calls complete() immediately."""
from nanobot.agent.tools.long_task import LongTaskTool
mgr = _make_manager_stub()
async def fake_run_step(*, system_prompt, user_message, extra_tools):
for t in extra_tools:
if t.name == "complete":
await t.execute(summary="All done. Report in summary.md")
return _step_result(
final_content="All done.",
tools_used=["complete"],
)
mgr.run_step.side_effect = fake_run_step
tool = LongTaskTool(manager=mgr)
result = await tool.execute(goal="Audit all issues.")
assert result == "All done. Report in summary.md"
@pytest.mark.asyncio
async def test_long_task_completes_after_multiple_handoffs():
"""Subagent calls handoff() twice then complete()."""
from nanobot.agent.tools.long_task import LongTaskTool
mgr = _make_manager_stub()
call_count = 0
async def fake_run_step(*, system_prompt, user_message, extra_tools):
nonlocal call_count
call_count += 1
if call_count == 1:
for t in extra_tools:
if t.name == "handoff":
await t.execute(message="Processed 1-8.")
elif call_count == 2:
assert "Processed 1-8." in user_message
assert "8 tool calls" in user_message
for t in extra_tools:
if t.name == "handoff":
await t.execute(message="Processed 9-16.")
else:
for t in extra_tools:
if t.name == "complete":
await t.execute(summary="All 16 items audited.")
return _step_result(tools_used=["handoff"])
mgr.run_step.side_effect = fake_run_step
tool = LongTaskTool(manager=mgr)
result = await tool.execute(goal="Audit 16 issues.")
assert result == "All 16 items audited."
assert call_count == 3
@pytest.mark.asyncio
async def test_long_task_fallback_when_no_signal_called():
"""Subagent doesn't call handoff/complete — extract progress from messages."""
from nanobot.agent.tools.long_task import LongTaskTool
mgr = _make_manager_stub()
async def fake_run_step(*, system_prompt, user_message, extra_tools):
return _step_result(
final_content="Tool budget exhausted.",
messages=[
{"role": "system", "content": "..."},
{"role": "user", "content": "..."},
{"role": "assistant", "content": "I processed items 1-5. Results in out.md."},
{"role": "tool", "content": "ok"},
{"role": "assistant", "content": "Tool budget exhausted. Call handoff() earlier next time."},
],
stop_reason="max_iterations",
)
mgr.run_step.side_effect = fake_run_step
tool = LongTaskTool(manager=mgr)
result = await tool.execute(goal="Do something.", max_steps=2)
# Should reach max_steps and return the fallback extracted from messages
assert "max steps (2)" in result
assert "I processed items 1-5" in result
@pytest.mark.asyncio
async def test_long_task_goal_appears_in_system_prompt():
"""Verify every step's system_prompt contains the long task system prompt."""
from nanobot.agent.tools.long_task import LongTaskTool
mgr = _make_manager_stub()
captured_prompts = []
async def fake_run_step(*, system_prompt, user_message, extra_tools):
captured_prompts.append(system_prompt)
for t in extra_tools:
if t.name == "complete":
await t.execute(summary="done")
return _step_result(final_content="done")
mgr.run_step.side_effect = fake_run_step
tool = LongTaskTool(manager=mgr)
await tool.execute(goal="Audit everything.")
assert len(captured_prompts) == 1
assert "handoff()" in captured_prompts[0]
assert "complete()" in captured_prompts[0]
assert "filesystem" in captured_prompts[0]
# ---------------------------------------------------------------------------
# Helper function tests
# ---------------------------------------------------------------------------
def test_build_user_message_step_0():
from nanobot.agent.tools.long_task import _build_user_message
msg = _build_user_message("Audit all issues.", step=0, handoff="")
assert msg.startswith("Audit all issues.")
assert "Step 1" in msg
assert "8 tool calls" in msg
assert "Previous Progress" not in msg
def test_build_user_message_later_step():
from nanobot.agent.tools.long_task import _build_user_message
msg = _build_user_message("Audit all issues.", step=3, handoff="Did 1-10.")
assert "Audit all issues." in msg
assert "Previous Progress" in msg
assert "Did 1-10." in msg
assert "Step 4" in msg
assert "8 tool calls" in msg
def test_extract_handoff_from_messages():
from nanobot.agent.tools.long_task import _extract_handoff_from_messages
messages = [
{"role": "system", "content": "sys"},
{"role": "user", "content": "do it"},
{"role": "assistant", "content": ""},
{"role": "tool", "content": "result"},
{"role": "assistant", "content": "I processed items 1-3."},
]
assert _extract_handoff_from_messages(messages) == "I processed items 1-3."
def test_extract_handoff_skips_budget_message():
from nanobot.agent.tools.long_task import _extract_handoff_from_messages
messages = [
{"role": "system", "content": "sys"},
{"role": "user", "content": "do it"},
{"role": "assistant", "content": "I processed items 1-3."},
{"role": "tool", "content": "result"},
{"role": "assistant", "content": "Tool budget exhausted. Call handoff() earlier."},
]
# Should skip the budget message and find the actual progress
assert _extract_handoff_from_messages(messages) == "I processed items 1-3."
def test_extract_handoff_from_empty_messages():
from nanobot.agent.tools.long_task import _extract_handoff_from_messages
assert _extract_handoff_from_messages([]) == ""
assert _extract_handoff_from_messages([{"role": "system", "content": "sys"}]) == ""
# ---------------------------------------------------------------------------
# Integration: verify LongTaskTool is wired into the main agent loop
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_long_task_registered_in_tool_registry(tmp_path):
"""Verify LongTaskTool appears in the main agent's tool registry."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
tool = loop.tools.get("long_task")
assert tool is not None
assert tool.name == "long_task"

View File

@ -337,6 +337,74 @@ async def test_drain_pending_blocks_while_subagents_running(tmp_path):
pass
@pytest.mark.asyncio
async def test_run_step_returns_agent_run_result(tmp_path):
"""run_step should execute a single subagent step and return AgentRunResult."""
from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(
provider=provider,
workspace=tmp_path,
bus=bus,
max_tool_result_chars=5000,
)
fake_result = SimpleNamespace(
stop_reason="completed",
final_content="step result",
error=None,
tool_events=[],
messages=[],
usage={},
had_injections=False,
tools_used=[],
)
mgr.runner.run = AsyncMock(return_value=fake_result)
# Create a dummy extra tool to verify injection
from nanobot.agent.tools.base import Tool, tool_parameters
from nanobot.agent.tools.schema import StringSchema, tool_parameters_schema
@tool_parameters(
tool_parameters_schema(
msg=StringSchema("test"),
required=["msg"],
)
)
class DummySignalTool(Tool):
@property
def name(self):
return "dummy_signal"
@property
def description(self):
return "test signal"
async def execute(self, msg="", **kwargs):
return "ok"
result = await mgr.run_step(
system_prompt="You are a test subagent.",
user_message="Do something.",
extra_tools=[DummySignalTool()],
)
assert result.final_content == "step result"
# Verify runner.run was called with correct spec
call_args = mgr.runner.run.call_args
spec = call_args[0][0]
assert spec.tools.has("dummy_signal")
assert spec.tools.has("read_file")
assert spec.fail_on_tool_error is False
# Verify system prompt and user message
assert spec.initial_messages[0]["role"] == "system"
assert spec.initial_messages[0]["content"] == "You are a test subagent."
assert spec.initial_messages[1]["role"] == "user"
assert spec.initial_messages[1]["content"] == "Do something."
@pytest.mark.asyncio
async def test_drain_pending_no_block_when_no_subagents(tmp_path):
"""_drain_pending should not block when no sub-agents are running."""