From 78ecb2a99a0f9da1e1d086ffe0ce08d5488718f5 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Wed, 13 May 2026 00:55:52 +0800 Subject: [PATCH] feat(long-task): major overhaul with structured handoffs, validation, and observability - Structured HandoffState: HandoffTool now accepts files_created, files_modified, next_step_hint, and verification fields instead of a plain string. Progress is passed between steps as structured data. - Completion validation round: After complete() is called, a dedicated validator step runs to verify the claim against the original goal. If validation fails, the task continues rather than returning a false completion. - Dynamic prompt system: 3 Jinja2 templates (step_start, step_middle, step_final) selected based on step number. Final steps get tighter budget and stronger "wrap up" guidance. - Automatic file change tracking: Extracts write_file/edit_file events from tool_events and injects them into the next step's context if the subagent forgot to report them explicitly. - Budget tracking & adaptive strategy: Cumulative token usage is tracked across steps. Per-step tool budget drops from 8 to 4 in the last two steps to force handoff/completion. - Crash retry with graceful degradation: A step that crashes is retried once. Persistent crashes terminate the task and return partial progress. - Full observability hooks for future WebUI integration: - set_hooks() with on_step_start, on_step_complete, on_handoff, on_validation_started, on_validation_passed, on_validation_failed, on_task_complete, on_task_error, and catch-all on_event. - Readable state properties: current_step, total_steps, status, last_handoff, cumulative_usage, goal. - inject_correction() allows external code to send user corrections that are injected into the next step's prompt. - run_step() accepts optional max_iterations for dynamic budget control. All 27 long-task tests and 11 subagent tests pass. --- nanobot/agent/subagent.py | 3 +- nanobot/agent/tools/long_task.py | 572 +++++++++++++++--- .../templates/agent/long_task/step_final.md | 28 + .../templates/agent/long_task/step_middle.md | 38 ++ .../templates/agent/long_task/step_start.md | 14 + .../templates/agent/long_task/validation.md | 16 + tests/agent/tools/test_long_task.py | 490 ++++++++++++--- 7 files changed, 992 insertions(+), 169 deletions(-) create mode 100644 nanobot/templates/agent/long_task/step_final.md create mode 100644 nanobot/templates/agent/long_task/step_middle.md create mode 100644 nanobot/templates/agent/long_task/step_start.md create mode 100644 nanobot/templates/agent/long_task/validation.md diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index adff97e7a..e9778f592 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -132,6 +132,7 @@ class SubagentManager: system_prompt: str, user_message: str, extra_tools: list["Tool"] | None = None, + max_iterations: int | None = None, ) -> AgentRunResult: """Run a single subagent step and return the result directly. @@ -150,7 +151,7 @@ class SubagentManager: ], tools=tools, model=self.model, - max_iterations=8, + max_iterations=max_iterations if max_iterations is not None else 8, max_iterations_message=( "Tool budget exhausted. " "Call handoff() or complete() earlier next time." diff --git a/nanobot/agent/tools/long_task.py b/nanobot/agent/tools/long_task.py index 5f8d45a06..bf0ca3968 100644 --- a/nanobot/agent/tools/long_task.py +++ b/nanobot/agent/tools/long_task.py @@ -2,12 +2,20 @@ from __future__ import annotations +import time +from dataclasses import dataclass, field from typing import Any, TYPE_CHECKING from loguru import logger from nanobot.agent.tools.base import Tool, tool_parameters -from nanobot.agent.tools.schema import StringSchema, IntegerSchema, tool_parameters_schema +from nanobot.agent.tools.schema import ( + ArraySchema, + IntegerSchema, + StringSchema, + tool_parameters_schema, +) +from nanobot.utils.prompt_templates import render_template if TYPE_CHECKING: from nanobot.agent.subagent import SubagentManager @@ -15,7 +23,33 @@ if TYPE_CHECKING: # --------------------------------------------------------------------------- -# Signal tools -- write progress/completion into a shared dict +# Structured handoff state +# --------------------------------------------------------------------------- + +@dataclass +class HandoffState: + """Structured progress state passed between long-task steps.""" + + message: str = "" + files_created: list[str] = field(default_factory=list) + files_modified: list[str] = field(default_factory=list) + next_step_hint: str = "" + verification: str = "" + + def is_empty(self) -> bool: + return not any( + [ + self.message, + self.files_created, + self.files_modified, + self.next_step_hint, + self.verification, + ] + ) + + +# --------------------------------------------------------------------------- +# Signal tools -- write progress/completion into a shared state # --------------------------------------------------------------------------- @tool_parameters( @@ -24,13 +58,28 @@ if TYPE_CHECKING: "What you completed in this step and where results are saved. " "The next step will pick up from here.", ), + files_created=ArraySchema( + StringSchema(""), + description="List of file paths you created in this step", + ), + files_modified=ArraySchema( + StringSchema(""), + description="List of file paths you modified in this step", + ), + next_step_hint=StringSchema( + "A clear, specific hint about what the next step should do. " + "Be concrete — e.g. 'Implement the test cases in test_foo.py'", + ), + verification=StringSchema( + "Any verification you performed (tests run, lint passed, etc.)", + ), required=["message"], ) ) class HandoffTool(Tool): """Signal that the step is done but the overall task continues.""" - def __init__(self, store: dict[str, str]) -> None: + def __init__(self, store: HandoffState) -> None: self._store = store @property @@ -40,14 +89,25 @@ class HandoffTool(Tool): @property def description(self) -> str: return ( - "REQUIRED after finishing your work in this step. " - "Pass your progress summary to the next step. " - "Use complete() instead if the entire goal is achieved." + "You are done with this step. Pass control to the next step. " + "You MUST call this (or complete()) before your tool budget runs out. " + "Provide a detailed summary, list files changed, and hint the next step." ) - async def execute(self, message: str, **kwargs: Any) -> str: - self._store["type"] = "handoff" - self._store["payload"] = message + async def execute( + self, + message: str, + files_created: list[str] | None = None, + files_modified: list[str] | None = None, + next_step_hint: str = "", + verification: str = "", + **kwargs: Any, + ) -> str: + self._store.message = message + self._store.files_created = list(files_created or []) + self._store.files_modified = list(files_modified or []) + self._store.next_step_hint = next_step_hint + self._store.verification = verification return "Progress recorded. The next step will continue from here." @@ -60,7 +120,7 @@ class HandoffTool(Tool): class CompleteTool(Tool): """Signal that the entire long task is finished.""" - def __init__(self, store: dict[str, str]) -> None: + def __init__(self, store: HandoffState) -> None: self._store = store @property @@ -70,48 +130,94 @@ class CompleteTool(Tool): @property def description(self) -> str: return ( - "The ENTIRE goal is achieved. Call this only when nothing remains." + "The ENTIRE goal is achieved. Call this only when nothing remains. " + "Your claim will be validated — if unproven, the task continues." ) async def execute(self, summary: str, **kwargs: Any) -> str: - self._store["type"] = "complete" - self._store["payload"] = summary - return "Task marked as complete." + self._store.message = summary + return "Task marked as complete. Awaiting validation." # --------------------------------------------------------------------------- -# System prompt for long-task subagent steps +# Budget and prompt helpers # --------------------------------------------------------------------------- _STEP_BUDGET = 8 +_FINAL_STEP_BUDGET = 4 # Lower budget for final steps # Must match max_iterations_message set in SubagentManager.run_step() _BUDGET_EXHAUSTED_PREFIX = "Tool budget exhausted" -_LONG_TASK_SYSTEM_PROMPT = """\ -You are one step in a chain working toward a goal. -1. Check the filesystem to see what's already done. -2. Do the next piece of work. Write results to files as you go — \ -do NOT just collect information without producing output. -3. When done with your chunk, call handoff() with a brief summary. \ -If the entire goal is finished, call complete() instead. - -IMPORTANT: Write output to files early and often. If you run out of \ -tool calls, only what's on the filesystem survives. -""" +def _step_budget(step: int, max_steps: int) -> int: + """Compute per-step tool budget based on progress.""" + if step >= max_steps - 2: + return _FINAL_STEP_BUDGET + return _STEP_BUDGET -def _build_user_message(goal: str, step: int, handoff: str) -> str: - """Build the user message for a subagent step with budget warning.""" +def _build_system_prompt(budget: int) -> str: + """Build the system prompt for a subagent step.""" + return ( + "You are one step in a chain working toward a goal.\n\n" + "Rules:\n" + "1. Do ONE small chunk of work per step.\n" + "2. Write results to files — do NOT just collect information.\n" + "3. Call handoff() when done with your chunk. " + "Call complete() ONLY if the ENTIRE goal is achieved.\n" + f"4. You have {budget} tool calls. " + "Reserve the last 1-2 for handoff() or complete()." + ) + + +def _build_user_message( + goal: str, + step: int, + max_steps: int, + handoff: HandoffState, + correction: str | None = None, +) -> str: + """Build the user message for a subagent step using templates.""" + budget = _step_budget(step, max_steps) budget_note = ( f"\n\n---\n" - f"Step {step + 1}. You have {_STEP_BUDGET} tool calls total. " - f"Call handoff() or complete() before you run out." + f"Step {step + 1} of {max_steps}. You have {budget} tool calls for this step. " + f"Reserve the last 1-2 calls for handoff() or complete(). " + f"If you run out of calls without calling one, your progress is LOST." ) + if step == 0: - return goal + budget_note - return f"{goal}\n\n## Previous Progress\n{handoff}{budget_note}" + prompt = render_template( + "agent/long_task/step_start.md", + step=step, + max_steps=max_steps, + goal=goal, + budget=budget, + ) + elif step >= max_steps - 3: + prompt = render_template( + "agent/long_task/step_final.md", + step=step, + max_steps=max_steps, + goal=goal, + budget=budget, + handoff=handoff, + ) + else: + prompt = render_template( + "agent/long_task/step_middle.md", + step=step, + max_steps=max_steps, + goal=goal, + budget=budget, + handoff=handoff, + ) + + if correction: + prompt += f"\n\n## User Correction\n{correction}\n" + + return prompt + budget_note def _extract_handoff_from_messages(messages: list[dict[str, Any]]) -> str: @@ -132,6 +238,42 @@ def _extract_handoff_from_messages(messages: list[dict[str, Any]]) -> str: return "" +def _extract_file_changes( + tool_events: list[dict[str, Any]], +) -> tuple[list[str], list[str]]: + """Extract file creation/modification events from tool events.""" + created: list[str] = [] + modified: list[str] = [] + for event in tool_events: + name = event.get("name", "") + status = event.get("status", "") + detail = event.get("detail", "") + if status != "ok": + continue + if name in ("write_file", "edit_file"): + # Try to extract file path from detail + if detail.startswith("Wrote ") or detail.startswith("Edited "): + path = detail.split(" ", 1)[1].split(":")[0].strip() + if name == "write_file": + created.append(path) + else: + modified.append(path) + return created, modified + + +# --------------------------------------------------------------------------- +# Observability: events and hooks +# --------------------------------------------------------------------------- + +@dataclass +class LongTaskEvent: + """Event emitted during long-task execution for observability.""" + + type: str + payload: dict[str, Any] = field(default_factory=dict) + timestamp: float = field(default_factory=time.time) + + # --------------------------------------------------------------------------- # Long Task Tool — the orchestrator # --------------------------------------------------------------------------- @@ -152,14 +294,32 @@ class LongTaskTool(Tool): def __init__(self, manager: SubagentManager) -> None: self._manager = manager + self._hooks: dict[str, Any] = {} + self._reset_state() - @classmethod - def enabled(cls, ctx: ToolContext) -> bool: - return ctx.subagent_manager is not None + def _reset_state(self) -> None: + """Reset internal state before a new execution. - @classmethod - def create(cls, ctx: ToolContext) -> Tool: - return cls(manager=ctx.subagent_manager) + Preserves any pending user corrections so inject_correction() can be + called before execute() starts. + """ + existing_signals = ( + self._state.get("signal_queue", []) if hasattr(self, "_state") else [] + ) + self._state: dict[str, Any] = { + "current_step": 0, + "total_steps": 0, + "goal": "", + "status": "idle", # idle, running, validating, completed, error + "last_handoff": HandoffState(), + "cumulative_usage": { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + }, + "signal_queue": existing_signals, + "error": None, + } @property def name(self) -> str: @@ -176,61 +336,315 @@ class LongTaskTool(Tool): "goal. For simple independent tasks, use spawn instead." ) - async def execute(self, goal: str, max_steps: int = 20, **kwargs: Any) -> str: - handoff = "" - logger.debug("long_task start: max_steps={}, goal={:.120}", max_steps, goal) - for step in range(max_steps): - signal_store: dict[str, str] = {} - user_msg = _build_user_message(goal, step, handoff) + @classmethod + def enabled(cls, ctx: ToolContext) -> bool: + return ctx.subagent_manager is not None + + @classmethod + def create(cls, ctx: ToolContext) -> Tool: + return cls(manager=ctx.subagent_manager) + + # --- State exposure for WebUI observability --- + + @property + def current_step(self) -> int: + return self._state["current_step"] + + @property + def total_steps(self) -> int: + return self._state["total_steps"] + + @property + def status(self) -> str: + return self._state["status"] + + @property + def last_handoff(self) -> HandoffState: + return self._state["last_handoff"] + + @property + def cumulative_usage(self) -> dict[str, int]: + return dict(self._state["cumulative_usage"]) + + @property + def goal(self) -> str: + return self._state["goal"] + + # --- External signal mechanism (for user correction) --- + + def inject_correction(self, message: str) -> None: + """Inject a user correction message to be read before the next step.""" + self._state["signal_queue"].append(message) + logger.info("LongTask correction injected: {}", message[:120]) + + def _pop_signal(self) -> str | None: + """Consume and return the oldest pending correction, if any.""" + if self._state["signal_queue"]: + return self._state["signal_queue"].pop(0) + return None + + # --- Hook system for WebUI and logging --- + + def set_hooks(self, hooks: dict[str, Any]) -> None: + """Register observability hooks. + + Supported hooks (all optional): + - on_task_start(goal, max_steps) + - on_step_start(step, goal, budget) + - on_step_complete(step, result, handoff) + - on_handoff(step, handoff) + - on_validation_started(step, completion_summary) + - on_validation_passed(step, summary) + - on_validation_failed(step, reason) + - on_task_complete(step, summary) + - on_task_error(step, error) + - on_event(event: LongTaskEvent) # catch-all + """ + self._hooks = dict(hooks) + + def _emit(self, event_type: str, **payload: Any) -> None: + """Emit an event to registered hooks.""" + event = LongTaskEvent(type=event_type, payload=payload) + logger.debug("LongTask event: {} | {}", event_type, payload) + + # Call catch-all hook + catch_all = self._hooks.get("on_event") + if catch_all is not None: try: - result = await self._manager.run_step( - system_prompt=_LONG_TASK_SYSTEM_PROMPT, - user_message=user_msg, - extra_tools=[HandoffTool(signal_store), CompleteTool(signal_store)], - ) + catch_all(event) except Exception: - logger.exception("long_task step {}/{} failed", step + 1, max_steps) - if handoff: + logger.exception("LongTask on_event hook failed") + + # Call specific hook + hook_name = f"on_{event_type}" + hook = self._hooks.get(hook_name) + if hook is not None: + try: + hook(**payload) + except Exception: + logger.exception("LongTask {} hook failed", hook_name) + + # --- Core execution --- + + async def execute(self, goal: str, max_steps: int = 20, **kwargs: Any) -> str: + handoff = HandoffState() + self._reset_state() + self._state["goal"] = goal + self._state["total_steps"] = max_steps + self._state["status"] = "running" + + logger.debug("long_task start: max_steps={}, goal={:.120}", max_steps, goal) + self._emit("task_start", goal=goal, max_steps=max_steps) + + for step in range(max_steps): + self._state["current_step"] = step + signal_store = HandoffState() + correction = self._pop_signal() + user_msg = _build_user_message( + goal, step, max_steps, handoff, correction=correction + ) + + budget = _step_budget(step, max_steps) + self._emit("step_start", step=step, goal=goal, budget=budget) + + # Run the step with retry on crash + result = await self._run_step_with_retry( + system_prompt=_build_system_prompt(budget), + user_message=user_msg, + extra_tools=[HandoffTool(signal_store), CompleteTool(signal_store)], + step=step, + budget=budget, + ) + + if result is None: + # Fatal error after retry + self._state["status"] = "error" + self._emit("task_error", step=step, error=self._state["error"]) + if handoff.message: return ( f"Long task failed at step {step + 1}/{max_steps}. " - f"Last progress:\n{handoff}" + f"Last progress:\n{handoff.message}" ) return f"Long task failed at step {step + 1}/{max_steps}." - sig_type = signal_store.get("type") - sig_payload = signal_store.get("payload", "") + + # Accumulate usage + usage = getattr(result, "usage", {}) or {} + for key in ("prompt_tokens", "completion_tokens", "total_tokens"): + self._state["cumulative_usage"][key] += usage.get(key, 0) + + # Extract file changes from tool events for automatic tracking + tool_events = getattr(result, "tool_events", []) or [] + auto_created, auto_modified = _extract_file_changes(tool_events) + if auto_created or auto_modified: + logger.debug( + "long_task step {}: auto-detected files created={}, modified={}", + step + 1, + auto_created, + auto_modified, + ) + + self._emit("step_complete", step=step, result=result, handoff=signal_store) + + # Determine signal from tool events + sig_type = "none" + for event in tool_events: + ev_name = event.get("name", "") + if ev_name == "complete": + sig_type = "complete" + break + elif ev_name == "handoff": + sig_type = "handoff" + break + + # Fallback: if no explicit signal but CompleteTool/HandoffTool was + # called without arguments (message empty), use final_content + if sig_type == "none" and signal_store.message: + # Tool was called but we couldn't detect from events; + # use the store content as handoff + sig_type = "handoff" + elif sig_type == "none": + signal_store.message = _extract_handoff_from_messages( + getattr(result, "messages", []) or [] + ) + if signal_store.message: + sig_type = "handoff" + + sig_payload = signal_store.message logger.info( "long_task step {}/{}: signal={}, stop_reason={}, tools={}", - step + 1, max_steps, sig_type or "auto", + step + 1, + max_steps, + sig_type, result.stop_reason, result.tools_used, ) - if sig_type == "complete": - logger.debug( - "long_task done at step {}: complete payload={:.200}", - step + 1, sig_payload, - ) - return sig_payload - # Auto-extract progress — don't require handoff() - if sig_type == "handoff": - handoff = sig_payload - logger.debug("long_task step {} handoff: {:.200}", step + 1, handoff) - elif result.stop_reason == "completed": - # Subagent returned text naturally (no more tool calls) - handoff = result.final_content or "" - logger.debug( - "long_task step {} natural end: {:.200}", - step + 1, handoff[:200] if handoff else "(empty)", + if sig_type == "complete": + # Validation round + self._state["status"] = "validating" + self._emit( + "validation_started", + step=step, + completion_summary=sig_payload, ) + + validated = await self._validate_completion( + goal, sig_payload, max_steps + ) + if validated: + self._state["status"] = "completed" + self._emit("task_complete", step=step, summary=sig_payload) + return sig_payload + else: + self._emit( + "validation_failed", + step=step, + reason="Validation did not confirm completion", + ) + # Fall through to handoff — continue working + handoff = signal_store + handoff.next_step_hint = ( + f"Validation failed. Continue working toward the goal. " + f"Previous claim: {sig_payload}" + ) + self._state["last_handoff"] = handoff + continue + + elif sig_type == "handoff": + self._emit("handoff_received", step=step, handoff=signal_store) + # Merge auto-detected file changes if not explicitly reported + if auto_created and not signal_store.files_created: + signal_store.files_created = auto_created + if auto_modified and not signal_store.files_modified: + signal_store.files_modified = auto_modified + handoff = signal_store + self._state["last_handoff"] = handoff + continue + else: - # max_iterations hit — extract whatever text the subagent produced - handoff = _extract_handoff_from_messages(result.messages) - logger.debug( - "long_task step {} auto-extract: {:.200}", - step + 1, handoff[:200] if handoff else "(empty)", - ) - logger.warning("long_task exhausted max_steps={}", max_steps) + # No signal — use extracted content as handoff + handoff = HandoffState(message=signal_store.message) + self._state["last_handoff"] = handoff + + self._state["status"] = "error" + self._emit("task_error", step=max_steps, error="Max steps reached") return ( f"Long task reached max steps ({max_steps}). " - f"Last progress:\n{handoff}" + f"Last progress:\n{handoff.message}" ) + + async def _run_step_with_retry( + self, + system_prompt: str, + user_message: str, + extra_tools: list[Any], + step: int, + budget: int, + ) -> Any: + """Run a single step with one retry on crash.""" + try: + return await self._manager.run_step( + system_prompt=system_prompt, + user_message=user_message, + extra_tools=extra_tools, + max_iterations=budget, + ) + except Exception as first_err: + logger.warning( + "long_task step {}/{} crashed (will retry once): {}", + step + 1, + self._state["total_steps"], + first_err, + ) + try: + return await self._manager.run_step( + system_prompt=system_prompt, + user_message=user_message, + extra_tools=extra_tools, + max_iterations=budget, + ) + except Exception as second_err: + logger.exception( + "long_task step {}/{} failed after retry", + step + 1, + self._state["total_steps"], + ) + self._state["error"] = str(second_err) + return None + + async def _validate_completion( + self, goal: str, completion_summary: str, max_steps: int + ) -> bool: + """Run a validation step to verify the completion claim.""" + try: + validation_store = HandoffState() + validation_prompt = render_template( + "agent/long_task/validation.md", + goal=goal, + completion_summary=completion_summary, + ) + result = await self._manager.run_step( + system_prompt=validation_prompt, + user_message="Validate the claimed completion. " + "Call complete() if verified, handoff() if not.", + extra_tools=[ + HandoffTool(validation_store), + CompleteTool(validation_store), + ], + max_iterations=4, # Short validation step + ) + # If complete() was called, validation passed + tool_events = getattr(result, "tool_events", []) or [] + for event in tool_events: + if event.get("name") == "complete": + self._emit("validation_passed", summary=completion_summary) + return True + + self._emit( + "validation_failed", + reason=validation_store.message or "Validator did not confirm", + ) + return False + except Exception: + logger.exception("Validation step failed") + return False diff --git a/nanobot/templates/agent/long_task/step_final.md b/nanobot/templates/agent/long_task/step_final.md new file mode 100644 index 000000000..96d099ad0 --- /dev/null +++ b/nanobot/templates/agent/long_task/step_final.md @@ -0,0 +1,28 @@ +# Long Task — FINAL Step {{ step + 1 }}/{{ max_steps }} + +**This is one of the LAST steps. You are running out of budget.** + +## Goal +{{ goal }} + +## Previous Progress +{% if handoff.message %} +{{ handoff.message }} +{% endif %} +{% if handoff.files_created or handoff.files_modified %} + +### Files Changed +{% for f in handoff.files_created %} +- Created: `{{ f }}` +{% endfor %} +{% for f in handoff.files_modified %} +- Modified: `{{ f }}` +{% endfor %} +{% endif %} + +## Instructions +1. **Do NOT start new work**. Only finish what is already in progress. +2. **Wrap up**: Complete any partial work, write final results to files. +3. **Final handoff**: Call `handoff()` with a clear summary of what remains unfinished. Call `complete()` ONLY if you are 100% sure everything is done. + +You have {{ budget }} tool calls total. Reserve the last 1-2 calls for `handoff()` or `complete()`. diff --git a/nanobot/templates/agent/long_task/step_middle.md b/nanobot/templates/agent/long_task/step_middle.md new file mode 100644 index 000000000..2b7df9d45 --- /dev/null +++ b/nanobot/templates/agent/long_task/step_middle.md @@ -0,0 +1,38 @@ +# Long Task — Step {{ step + 1 }}/{{ max_steps }} + +You are one step in a chain working toward a goal. + +## Goal +{{ goal }} + +## Previous Progress +{% if handoff.message %} +{{ handoff.message }} +{% endif %} +{% if handoff.files_created or handoff.files_modified %} + +### Files Changed +{% for f in handoff.files_created %} +- Created: `{{ f }}` +{% endfor %} +{% for f in handoff.files_modified %} +- Modified: `{{ f }}` +{% endfor %} +{% endif %} +{% if handoff.next_step_hint %} + +### Suggested Next Step +{{ handoff.next_step_hint }} +{% endif %} +{% if handoff.verification %} + +### Verification +{{ handoff.verification }} +{% endif %} + +## Instructions +1. **Check existing work**: Use the file list above — do NOT re-explore files already handled. +2. **Do the next chunk**: Make concrete progress. Write results to files. +3. **Handoff**: Call `handoff()` with your progress summary, files changed, and a hint for the next step. Call `complete()` only if the ENTIRE goal is achieved. + +You have {{ budget }} tool calls total. Reserve the last 1-2 calls for `handoff()` or `complete()`. diff --git a/nanobot/templates/agent/long_task/step_start.md b/nanobot/templates/agent/long_task/step_start.md new file mode 100644 index 000000000..4440eb600 --- /dev/null +++ b/nanobot/templates/agent/long_task/step_start.md @@ -0,0 +1,14 @@ +# Long Task — Step {{ step + 1 }}/{{ max_steps }} + +You are the FIRST step in a chain working toward a goal. + +## Goal +{{ goal }} + +## Instructions +1. **Explore first**: Check the filesystem to understand the current state. Do NOT assume anything. +2. **Plan your work**: Decide what chunk you will do in this step. +3. **Do the work**: Make concrete progress. Write results to files — do NOT just collect information without producing output. +4. **Handoff**: When done, call `handoff()` with a detailed summary. If the ENTIRE goal is already achieved, call `complete()` instead. + +You have {{ budget }} tool calls total. Reserve the last 1-2 calls for `handoff()` or `complete()`. diff --git a/nanobot/templates/agent/long_task/validation.md b/nanobot/templates/agent/long_task/validation.md new file mode 100644 index 000000000..1cc909365 --- /dev/null +++ b/nanobot/templates/agent/long_task/validation.md @@ -0,0 +1,16 @@ +# Validation — Confirm Goal Completion + +## Original Goal +{{ goal }} + +## Claimed Completion +{{ completion_summary }} + +## Instructions +You are a strict validator. Review the claimed completion against the original goal. + +1. Check every requirement in the goal. Is each one actually satisfied by evidence (files created, tests passing, etc.)? +2. If ANY requirement is unproven or incomplete, call `handoff()` with what is missing. +3. If ALL requirements are satisfied with evidence, call `complete()` confirming the validation. + +Be skeptical. "Looks correct" is not enough — verify against the filesystem. diff --git a/tests/agent/tools/test_long_task.py b/tests/agent/tools/test_long_task.py index 1bece10c2..c3bc5646c 100644 --- a/tests/agent/tools/test_long_task.py +++ b/tests/agent/tools/test_long_task.py @@ -2,53 +2,82 @@ import pytest from types import SimpleNamespace - from unittest.mock import AsyncMock, MagicMock +from nanobot.agent.tools.long_task import ( + HandoffState, + HandoffTool, + CompleteTool, + LongTaskTool, + LongTaskEvent, + _build_system_prompt, + _build_user_message, + _extract_file_changes, + _extract_handoff_from_messages, +) + + +# --------------------------------------------------------------------------- +# Signal tool tests +# --------------------------------------------------------------------------- + @pytest.mark.asyncio -async def test_handoff_tool_stores_signal(): - from nanobot.agent.tools.long_task import HandoffTool - - store: dict[str, str] = {} +async def test_handoff_tool_stores_structured_signal(): + store = HandoffState() tool = HandoffTool(store) - result = await tool.execute(message="Processed items 1-8. Results in out.md. Continue with item 9.") + result = await tool.execute( + message="Processed items 1-8. Results in out.md.", + files_created=["out.md", "report.md"], + files_modified=["main.py"], + next_step_hint="Continue with item 9.", + verification="Tests passed", + ) assert result == "Progress recorded. The next step will continue from here." - assert store["type"] == "handoff" - assert store["payload"] == "Processed items 1-8. Results in out.md. Continue with item 9." + assert store.message == "Processed items 1-8. Results in out.md." + assert store.files_created == ["out.md", "report.md"] + assert store.files_modified == ["main.py"] + assert store.next_step_hint == "Continue with item 9." + assert store.verification == "Tests passed" + + +@pytest.mark.asyncio +async def test_handoff_tool_defaults_optional_fields(): + store = HandoffState() + tool = HandoffTool(store) + await tool.execute(message="Done.") + assert store.files_created == [] + assert store.files_modified == [] + assert store.next_step_hint == "" + assert store.verification == "" @pytest.mark.asyncio async def test_complete_tool_stores_signal(): - from nanobot.agent.tools.long_task import CompleteTool - - store: dict[str, str] = {} + store = HandoffState() tool = CompleteTool(store) result = await tool.execute(summary="All 100 items processed. Summary in report.md") - assert result == "Task marked as complete." - assert store["type"] == "complete" - assert store["payload"] == "All 100 items processed. Summary in report.md" + assert result == "Task marked as complete. Awaiting validation." + assert store.message == "All 100 items processed. Summary in report.md" @pytest.mark.asyncio async def test_signal_tools_overwrite_on_multiple_calls(): - """Last call wins -- the orchestrator only reads the final signal.""" - from nanobot.agent.tools.long_task import HandoffTool, CompleteTool - - store: dict[str, str] = {} + """Last call wins — the orchestrator only reads the final signal.""" + store = HandoffState() handoff = HandoffTool(store) complete = CompleteTool(store) await handoff.execute(message="first progress") - assert store["type"] == "handoff" + assert store.message == "first progress" await complete.execute(summary="done early") - assert store["type"] == "complete" - assert store["payload"] == "done early" + assert store.message == "done early" # --------------------------------------------------------------------------- # Helper: minimal SubagentManager stub # --------------------------------------------------------------------------- + def _make_manager_stub(): """Create a minimal SubagentManager stub with a mockable run_step.""" mgr = MagicMock() @@ -64,6 +93,7 @@ def _step_result(**overrides): tool_events=[], stop_reason="completed", tools_used=[], + usage={}, ) defaults.update(overrides) return SimpleNamespace(**defaults) @@ -76,68 +106,150 @@ def _step_result(**overrides): @pytest.mark.asyncio async def test_long_task_completes_in_one_step(): - """Subagent calls complete() immediately.""" - from nanobot.agent.tools.long_task import LongTaskTool - + """Subagent calls complete() immediately, validation passes.""" mgr = _make_manager_stub() + call_count = 0 - async def fake_run_step(*, system_prompt, user_message, extra_tools): - for t in extra_tools: - if t.name == "complete": - await t.execute(summary="All done. Report in summary.md") - return _step_result( - final_content="All done.", - tools_used=["complete"], - ) + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + nonlocal call_count + call_count += 1 + if call_count == 1: + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="All done. Report in summary.md") + return _step_result( + final_content="All done.", + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + else: + # Validation round + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Validated") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) mgr.run_step.side_effect = fake_run_step tool = LongTaskTool(manager=mgr) result = await tool.execute(goal="Audit all issues.") assert result == "All done. Report in summary.md" + assert call_count == 2 # main step + validation @pytest.mark.asyncio async def test_long_task_completes_after_multiple_handoffs(): - """Subagent calls handoff() twice then complete().""" - from nanobot.agent.tools.long_task import LongTaskTool - + """Subagent calls handoff() twice then complete(), validation passes.""" mgr = _make_manager_stub() call_count = 0 - async def fake_run_step(*, system_prompt, user_message, extra_tools): + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): nonlocal call_count call_count += 1 if call_count == 1: for t in extra_tools: if t.name == "handoff": await t.execute(message="Processed 1-8.") + return _step_result( + tools_used=["handoff"], + tool_events=[{"name": "handoff", "status": "ok", "detail": ""}], + ) elif call_count == 2: assert "Processed 1-8." in user_message - assert "8 tool calls" in user_message + assert "Step 2" in user_message or "Step 2 of" in user_message for t in extra_tools: if t.name == "handoff": await t.execute(message="Processed 9-16.") - else: + return _step_result( + tools_used=["handoff"], + tool_events=[{"name": "handoff", "status": "ok", "detail": ""}], + ) + elif call_count == 3: for t in extra_tools: if t.name == "complete": await t.execute(summary="All 16 items audited.") - return _step_result(tools_used=["handoff"]) + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + else: + # Validation round + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Validated") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) mgr.run_step.side_effect = fake_run_step tool = LongTaskTool(manager=mgr) result = await tool.execute(goal="Audit 16 issues.") assert result == "All 16 items audited." - assert call_count == 3 + assert call_count == 4 # 3 main steps + validation + + +@pytest.mark.asyncio +async def test_long_task_validation_falls_back_to_handoff(): + """Subagent claims complete but validation fails — task continues.""" + mgr = _make_manager_stub() + call_count = 0 + + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + nonlocal call_count + call_count += 1 + if call_count == 1: + # First step claims complete + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Done.") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + elif call_count == 2: + # Validation round fails (handoff called) + for t in extra_tools: + if t.name == "handoff": + await t.execute(message="Not actually done. Need more work.") + return _step_result( + tools_used=["handoff"], + tool_events=[{"name": "handoff", "status": "ok", "detail": ""}], + ) + elif call_count == 3: + # Continue and complete for real + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Really done.") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + else: + # Second validation passes + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Validated") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + result = await tool.execute(goal="Do something.", max_steps=5) + assert "Really done." == result + assert call_count == 4 @pytest.mark.asyncio async def test_long_task_fallback_when_no_signal_called(): """Subagent doesn't call handoff/complete — extract progress from messages.""" - from nanobot.agent.tools.long_task import LongTaskTool - mgr = _make_manager_stub() - async def fake_run_step(*, system_prompt, user_message, extra_tools): + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): return _step_result( final_content="Tool budget exhausted.", messages=[ @@ -161,57 +273,227 @@ async def test_long_task_fallback_when_no_signal_called(): @pytest.mark.asyncio async def test_long_task_auto_extracts_on_natural_end(): """Subagent finishes naturally (stop_reason=completed) without calling signal.""" - from nanobot.agent.tools.long_task import LongTaskTool - mgr = _make_manager_stub() - steps = 0 + call_count = 0 - async def fake_run_step(*, system_prompt, user_message, extra_tools): - nonlocal steps - steps += 1 - if steps == 1: + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + nonlocal call_count + call_count += 1 + if call_count == 1: return _step_result( final_content="I processed items 1-5. Results in out.md.", stop_reason="completed", ) - # Second step: subagent calls complete - for t in extra_tools: - if t.name == "complete": - await t.execute(summary="All done.") - return _step_result( - final_content="All done.", - tools_used=["complete"], - ) + elif call_count == 2: + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="All done.") + return _step_result( + final_content="All done.", + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + else: + # Validation + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Validated") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) mgr.run_step.side_effect = fake_run_step tool = LongTaskTool(manager=mgr) result = await tool.execute(goal="Process items.", max_steps=5) assert "All done." == result - assert steps == 2 + assert call_count == 3 @pytest.mark.asyncio -async def test_long_task_goal_appears_in_system_prompt(): - """Verify every step's system_prompt contains the long task system prompt.""" - from nanobot.agent.tools.long_task import LongTaskTool - +async def test_long_task_retries_on_crash(): + """A step that crashes once should be retried.""" mgr = _make_manager_stub() - captured_prompts = [] + call_count = 0 - async def fake_run_step(*, system_prompt, user_message, extra_tools): - captured_prompts.append(system_prompt) + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise RuntimeError("Simulated crash") for t in extra_tools: if t.name == "complete": - await t.execute(summary="done") - return _step_result(final_content="done") + await t.execute(summary="Recovered.") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) mgr.run_step.side_effect = fake_run_step tool = LongTaskTool(manager=mgr) - await tool.execute(goal="Audit everything.") - assert len(captured_prompts) == 1 - assert "handoff()" in captured_prompts[0] - assert "complete()" in captured_prompts[0] - assert "filesystem" in captured_prompts[0] + result = await tool.execute(goal="Test retry.") + assert "Recovered." == result + assert call_count == 3 # main step + retry + validation + + +@pytest.mark.asyncio +async def test_long_task_fails_after_two_crashes(): + """A step that crashes twice should terminate the task.""" + mgr = _make_manager_stub() + + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + raise RuntimeError("Persistent crash") + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + result = await tool.execute(goal="Test failure.", max_steps=3) + assert "failed at step 1/3" in result + assert tool.status == "error" + + +@pytest.mark.asyncio +async def test_long_task_uses_dynamic_budget(): + """Final steps should use lower max_iterations.""" + mgr = _make_manager_stub() + captured_budgets = [] + + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + captured_budgets.append(max_iterations) + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Done.") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + await tool.execute(goal="Test budget.", max_steps=5) + # Step 0-2 should use 8, step 3+ should use 4 + # But we complete on step 0, so only one budget captured + assert captured_budgets[0] == 8 + + +# --------------------------------------------------------------------------- +# Hook and observability tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_hooks_receive_events(): + """Registered hooks should be called during execution.""" + mgr = _make_manager_stub() + events = [] + + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Done.") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + tool.set_hooks({ + "on_task_start": lambda **kw: events.append(("task_start", kw)), + "on_step_start": lambda **kw: events.append(("step_start", kw)), + "on_step_complete": lambda **kw: events.append(("step_complete", kw)), + "on_validation_started": lambda **kw: events.append(("validation_started", kw)), + "on_task_complete": lambda **kw: events.append(("task_complete", kw)), + }) + await tool.execute(goal="Test hooks.") + + assert any(e[0] == "task_start" for e in events) + assert any(e[0] == "step_start" for e in events) + assert any(e[0] == "step_complete" for e in events) + assert any(e[0] == "validation_started" for e in events) + assert any(e[0] == "task_complete" for e in events) + + +@pytest.mark.asyncio +async def test_catch_all_hook_receives_events(): + """The on_event catch-all hook should receive all events.""" + mgr = _make_manager_stub() + events = [] + + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Done.") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + tool.set_hooks({ + "on_event": lambda ev: events.append(ev.type), + }) + await tool.execute(goal="Test catch-all.") + + assert "task_start" in events + assert "step_start" in events + assert "task_complete" in events + + +@pytest.mark.asyncio +async def test_state_exposure(): + """Properties should reflect current execution state.""" + mgr = _make_manager_stub() + + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + for t in extra_tools: + if t.name == "handoff": + await t.execute(message="Progress.") + return _step_result( + tools_used=["handoff"], + tool_events=[{"name": "handoff", "status": "ok", "detail": ""}], + ) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + assert tool.status == "idle" + + # Start execution in background so we can inspect mid-run + import asyncio + task = asyncio.create_task(tool.execute(goal="Test state.", max_steps=3)) + # Give it a moment to start + await asyncio.sleep(0.01) + # Task should have finished by now since mocks are instant + await task + + assert tool.goal == "Test state." + assert tool.total_steps == 3 + assert tool.status == "error" # max_steps reached + assert tool.last_handoff.message == "Progress." + + +@pytest.mark.asyncio +async def test_inject_correction(): + """User correction should appear in the next step's user message.""" + mgr = _make_manager_stub() + captured_messages = [] + + async def fake_run_step(*, system_prompt, user_message, extra_tools, max_iterations=None): + captured_messages.append(user_message) + for t in extra_tools: + if t.name == "complete": + await t.execute(summary="Done.") + return _step_result( + tools_used=["complete"], + tool_events=[{"name": "complete", "status": "ok", "detail": ""}], + ) + + mgr.run_step.side_effect = fake_run_step + tool = LongTaskTool(manager=mgr) + tool.inject_correction("Focus on error handling.") + await tool.execute(goal="Refactor code.") + + assert any("Focus on error handling" in msg for msg in captured_messages) # --------------------------------------------------------------------------- @@ -219,30 +501,48 @@ async def test_long_task_goal_appears_in_system_prompt(): # --------------------------------------------------------------------------- -def test_build_user_message_step_0(): - from nanobot.agent.tools.long_task import _build_user_message +def test_build_system_prompt(): + prompt = _build_system_prompt(budget=8) + assert "handoff()" in prompt + assert "complete()" in prompt + assert "8 tool calls" in prompt - msg = _build_user_message("Audit all issues.", step=0, handoff="") - assert msg.startswith("Audit all issues.") - assert "Step 1" in msg + +def test_build_user_message_step_0(): + msg = _build_user_message("Audit all issues.", step=0, max_steps=20, handoff=HandoffState()) + assert "Audit all issues." in msg + assert "Step 1 of 20" in msg assert "8 tool calls" in msg assert "Previous Progress" not in msg def test_build_user_message_later_step(): - from nanobot.agent.tools.long_task import _build_user_message - - msg = _build_user_message("Audit all issues.", step=3, handoff="Did 1-10.") + handoff = HandoffState(message="Did 1-10.", files_created=["a.py"], next_step_hint="Do Y") + msg = _build_user_message("Audit all issues.", step=3, max_steps=20, handoff=handoff) assert "Audit all issues." in msg assert "Previous Progress" in msg assert "Did 1-10." in msg - assert "Step 4" in msg - assert "8 tool calls" in msg + assert "a.py" in msg + assert "Do Y" in msg + assert "Step 4 of 20" in msg + + +def test_build_user_message_final_step(): + handoff = HandoffState(message="Almost done.") + msg = _build_user_message("Audit all issues.", step=18, max_steps=20, handoff=handoff) + assert "FINAL Step" in msg + assert "4 tool calls" in msg # final steps use lower budget + + +def test_build_user_message_with_correction(): + msg = _build_user_message( + "Audit.", step=0, max_steps=20, handoff=HandoffState(), correction="Skip file A" + ) + assert "User Correction" in msg + assert "Skip file A" in msg def test_extract_handoff_from_messages(): - from nanobot.agent.tools.long_task import _extract_handoff_from_messages - messages = [ {"role": "system", "content": "sys"}, {"role": "user", "content": "do it"}, @@ -254,8 +554,6 @@ def test_extract_handoff_from_messages(): def test_extract_handoff_skips_budget_message(): - from nanobot.agent.tools.long_task import _extract_handoff_from_messages - messages = [ {"role": "system", "content": "sys"}, {"role": "user", "content": "do it"}, @@ -268,12 +566,26 @@ def test_extract_handoff_skips_budget_message(): def test_extract_handoff_from_empty_messages(): - from nanobot.agent.tools.long_task import _extract_handoff_from_messages - assert _extract_handoff_from_messages([]) == "" assert _extract_handoff_from_messages([{"role": "system", "content": "sys"}]) == "" +def test_extract_file_changes_from_tool_events(): + events = [ + {"name": "write_file", "status": "ok", "detail": "Wrote /workspace/a.py: done"}, + {"name": "edit_file", "status": "ok", "detail": "Edited /workspace/b.py: patched"}, + {"name": "read_file", "status": "ok", "detail": "Read /workspace/c.py"}, + {"name": "write_file", "status": "error", "detail": "Failed"}, + ] + created, modified = _extract_file_changes(events) + assert created == ["/workspace/a.py"] + assert modified == ["/workspace/b.py"] + + +def test_extract_file_changes_empty(): + assert _extract_file_changes([]) == ([], []) + + # --------------------------------------------------------------------------- # Integration: verify LongTaskTool is wired into the main agent loop # ---------------------------------------------------------------------------