feat(subagent): add real-time monitoring for subagent execution

Track subagent progress via SubagentStatus dataclass that captures
phase, iteration, tool events, usage, and errors. Wire through
checkpoint_callback for phase transitions and _SubagentHook.after_iteration
for per-iteration updates. Enhance SelfTool inspect to display rich
subagent status via dot-path navigation.
This commit is contained in:
chengyongru 2026-04-14 21:58:33 +08:00
parent e939ac7049
commit b5cd23bf5e
4 changed files with 474 additions and 22 deletions

View File

@ -257,6 +257,7 @@ class AgentLoop:
self._runtime_vars: dict[str, Any] = {} self._runtime_vars: dict[str, Any] = {}
self._unregistered_tools: dict[str, Tool] = {} self._unregistered_tools: dict[str, Tool] = {}
self._config_snapshots: dict[str, dict[str, Any]] = {} self._config_snapshots: dict[str, dict[str, Any]] = {}
if self_evolution:
self._backup_critical_tools() self._backup_critical_tools()
self.commands = CommandRouter() self.commands = CommandRouter()
register_builtin_commands(self.commands) register_builtin_commands(self.commands)

View File

@ -2,7 +2,9 @@
import asyncio import asyncio
import json import json
import time
import uuid import uuid
from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -23,12 +25,29 @@ from nanobot.config.schema import ExecToolConfig, WebToolsConfig
from nanobot.providers.base import LLMProvider from nanobot.providers.base import LLMProvider
class _SubagentHook(AgentHook): @dataclass(slots=True)
"""Logging-only hook for subagent execution.""" class SubagentStatus:
"""Real-time status of a running subagent."""
def __init__(self, task_id: str) -> None: task_id: str
label: str
task_description: str
started_at: float # time.monotonic()
phase: str = "initializing" # initializing | awaiting_tools | tools_completed | final_response | done | error
iteration: int = 0
tool_events: list = field(default_factory=list) # [{name, status, detail}, ...]
usage: dict = field(default_factory=dict) # token usage
stop_reason: str | None = None
error: str | None = None
class _SubagentHook(AgentHook):
"""Hook for subagent execution — logs tool calls and updates status."""
def __init__(self, task_id: str, status: SubagentStatus | None = None) -> None:
super().__init__() super().__init__()
self._task_id = task_id self._task_id = task_id
self._status = status
async def before_execute_tools(self, context: AgentHookContext) -> None: async def before_execute_tools(self, context: AgentHookContext) -> None:
for tool_call in context.tool_calls: for tool_call in context.tool_calls:
@ -38,6 +57,15 @@ class _SubagentHook(AgentHook):
self._task_id, tool_call.name, args_str, self._task_id, tool_call.name, args_str,
) )
async def after_iteration(self, context: AgentHookContext) -> None:
if self._status is None:
return
self._status.iteration = context.iteration
self._status.tool_events = list(context.tool_events)
self._status.usage = dict(context.usage)
if context.error:
self._status.error = str(context.error)
class SubagentManager: class SubagentManager:
"""Manages background subagent execution.""" """Manages background subagent execution."""
@ -67,6 +95,7 @@ class SubagentManager:
self.disabled_skills = set(disabled_skills or []) self.disabled_skills = set(disabled_skills or [])
self.runner = AgentRunner(provider) self.runner = AgentRunner(provider)
self._running_tasks: dict[str, asyncio.Task[None]] = {} self._running_tasks: dict[str, asyncio.Task[None]] = {}
self._task_statuses: dict[str, SubagentStatus] = {}
self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...} self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...}
async def spawn( async def spawn(
@ -82,8 +111,16 @@ class SubagentManager:
display_label = label or task[:30] + ("..." if len(task) > 30 else "") display_label = label or task[:30] + ("..." if len(task) > 30 else "")
origin = {"channel": origin_channel, "chat_id": origin_chat_id} origin = {"channel": origin_channel, "chat_id": origin_chat_id}
status = SubagentStatus(
task_id=task_id,
label=display_label,
task_description=task,
started_at=time.monotonic(),
)
self._task_statuses[task_id] = status
bg_task = asyncio.create_task( bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin) self._run_subagent(task_id, task, display_label, origin, status)
) )
self._running_tasks[task_id] = bg_task self._running_tasks[task_id] = bg_task
if session_key: if session_key:
@ -91,6 +128,7 @@ class SubagentManager:
def _cleanup(_: asyncio.Task) -> None: def _cleanup(_: asyncio.Task) -> None:
self._running_tasks.pop(task_id, None) self._running_tasks.pop(task_id, None)
self._task_statuses.pop(task_id, None)
if session_key and (ids := self._session_tasks.get(session_key)): if session_key and (ids := self._session_tasks.get(session_key)):
ids.discard(task_id) ids.discard(task_id)
if not ids: if not ids:
@ -107,10 +145,15 @@ class SubagentManager:
task: str, task: str,
label: str, label: str,
origin: dict[str, str], origin: dict[str, str],
status: SubagentStatus,
) -> None: ) -> None:
"""Execute the subagent task and announce the result.""" """Execute the subagent task and announce the result."""
logger.info("Subagent [{}] starting task: {}", task_id, label) logger.info("Subagent [{}] starting task: {}", task_id, label)
async def _on_checkpoint(payload: dict) -> None:
status.phase = payload.get("phase", status.phase)
status.iteration = payload.get("iteration", status.iteration)
try: try:
# Build subagent tools (no message tool, no spawn tool) # Build subagent tools (no message tool, no spawn tool)
tools = ToolRegistry() tools = ToolRegistry()
@ -145,12 +188,16 @@ class SubagentManager:
model=self.model, model=self.model,
max_iterations=15, max_iterations=15,
max_tool_result_chars=self.max_tool_result_chars, max_tool_result_chars=self.max_tool_result_chars,
hook=_SubagentHook(task_id), hook=_SubagentHook(task_id, status),
max_iterations_message="Task completed but no final response was generated.", max_iterations_message="Task completed but no final response was generated.",
error_message=None, error_message=None,
fail_on_tool_error=True, fail_on_tool_error=True,
checkpoint_callback=_on_checkpoint,
)) ))
if result.stop_reason == "tool_error": if result.stop_reason == "tool_error":
status.phase = "done"
status.stop_reason = result.stop_reason
status.tool_events = list(result.tool_events)
await self._announce_result( await self._announce_result(
task_id, task_id,
label, label,
@ -161,6 +208,8 @@ class SubagentManager:
) )
return return
if result.stop_reason == "error": if result.stop_reason == "error":
status.phase = "done"
status.stop_reason = result.stop_reason
await self._announce_result( await self._announce_result(
task_id, task_id,
label, label,
@ -173,10 +222,14 @@ class SubagentManager:
final_result = result.final_content or "Task completed but no final response was generated." final_result = result.final_content or "Task completed but no final response was generated."
logger.info("Subagent [{}] completed successfully", task_id) logger.info("Subagent [{}] completed successfully", task_id)
status.phase = "done"
status.stop_reason = result.stop_reason
await self._announce_result(task_id, label, task, final_result, origin, "ok") await self._announce_result(task_id, label, task, final_result, origin, "ok")
except Exception as e: except Exception as e:
error_msg = f"Error: {str(e)}" error_msg = f"Error: {str(e)}"
status.phase = "error"
status.error = str(e)
logger.error("Subagent [{}] failed: {}", task_id, e) logger.error("Subagent [{}] failed: {}", task_id, e)
await self._announce_result(task_id, label, task, error_msg, origin, "error") await self._announce_result(task_id, label, task, error_msg, origin, "error")

View File

@ -2,8 +2,10 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import copy import copy
import inspect import inspect
import time
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from loguru import logger from loguru import logger
@ -56,6 +58,7 @@ class SelfTool(Tool):
_MAX_VALUE_ELEMENTS = 1024 _MAX_VALUE_ELEMENTS = 1024
_MAX_RUNTIME_KEYS = 64 _MAX_RUNTIME_KEYS = 64
_CALL_TIMEOUT_SECONDS = 30
def __init__(self, loop: AgentLoop) -> None: def __init__(self, loop: AgentLoop) -> None:
self._loop = loop self._loop = loop
@ -160,6 +163,8 @@ class SelfTool(Tool):
@staticmethod @staticmethod
def _format_value(val: Any, key: str = "") -> str: def _format_value(val: Any, key: str = "") -> str:
"""Format a value for inspect output.""" """Format a value for inspect output."""
from nanobot.agent.subagent import SubagentStatus
if val is None: if val is None:
return f"{key}: None" if key else "None" return f"{key}: None" if key else "None"
@ -168,6 +173,36 @@ class SelfTool(Tool):
text = f"{key}: {val!r}" if key else repr(val) text = f"{key}: {val!r}" if key else repr(val)
return text return text
# SubagentStatus → rich single-status display (before callable check)
if isinstance(val, SubagentStatus):
elapsed = time.monotonic() - val.started_at
tool_summary = ", ".join(
f"{e.get('name', '?')}({e.get('status', '?')})" for e in val.tool_events[-5:]
) or "none"
lines = [
f"Subagent [{val.task_id}] '{val.label}'",
f" task: {val.task_description}",
f" phase: {val.phase}, iteration: {val.iteration}, elapsed: {elapsed:.1f}s",
f" tools: {tool_summary}",
f" usage: {val.usage or 'n/a'}",
]
if val.error:
lines.append(f" error: {val.error}")
if val.stop_reason:
lines.append(f" stop_reason: {val.stop_reason}")
return "\n".join(lines)
# Has _running_tasks → SubagentManager-like (before callable check)
if hasattr(val, "_running_tasks") and isinstance(getattr(val, "_running_tasks"), dict):
# Try _task_statuses first for rich display
statuses = getattr(val, "_task_statuses", None)
if statuses and isinstance(statuses, dict) and len(statuses) > 0:
return SelfTool._format_value(statuses, key)
tasks = val._running_tasks
count = len(tasks)
ids = list(tasks.keys())[:10]
return f"subagents: {count} running — {ids}"
# Callable → method hint # Callable → method hint
if callable(val): if callable(val):
name = getattr(val, "__name__", str(val)) name = getattr(val, "__name__", str(val))
@ -179,8 +214,27 @@ class SelfTool(Tool):
label = f"{key}." if key else "" label = f"{key}." if key else ""
return f"method {label}{name}{sig} — use 'call' action to invoke" return f"method {label}{name}{sig} — use 'call' action to invoke"
# Dict # Dict — check for dict[str, SubagentStatus] first
if isinstance(val, dict): if isinstance(val, dict):
if val and all(isinstance(v, SubagentStatus) for v in val.values()):
prefix = f"{key}: " if key else ""
count = len(val)
if count == 0:
return f"{prefix}no running subagents"
lines = [f"{prefix}{count} subagent(s):"]
for tid, st in val.items():
elapsed = time.monotonic() - st.started_at
tool_summary = ", ".join(
f"{e.get('name', '?')}({e.get('status', '?')})" for e in st.tool_events[-5:]
) or "none"
lines.append(
f" [{tid}] '{st.label}'\n"
f" phase: {st.phase}, iteration: {st.iteration}, elapsed: {elapsed:.1f}s\n"
f" tools: {tool_summary}\n"
f" usage: {st.usage or 'n/a'}"
)
return "\n".join(lines)
r = repr(val) r = repr(val)
if len(r) > 2000: if len(r) > 2000:
r = r[:2000] + "... (truncated)" r = r[:2000] + "... (truncated)"
@ -200,13 +254,6 @@ class SelfTool(Tool):
names = val.tool_names names = val.tool_names
return f"tools: {len(names)} registered — {names}" return f"tools: {len(names)} registered — {names}"
# Has _running_tasks → SubagentManager-like
if hasattr(val, "_running_tasks") and isinstance(getattr(val, "_running_tasks"), dict):
tasks = val._running_tasks
count = len(tasks)
ids = list(tasks.keys())[:10]
return f"subagents: {count} running — {ids}"
# Generic object → list public attrs # Generic object → list public attrs
attrs = [a for a in dir(val) if not a.startswith("_") and not callable(getattr(val, a, None))] attrs = [a for a in dir(val) if not a.startswith("_") and not callable(getattr(val, a, None))]
cls_name = type(val).__name__ cls_name = type(val).__name__
@ -307,6 +354,9 @@ class SelfTool(Tool):
# Dot-path modify: resolve parent, set leaf # Dot-path modify: resolve parent, set leaf
if "." in key: if "." in key:
parent_path, leaf = key.rsplit(".", 1) parent_path, leaf = key.rsplit(".", 1)
if leaf in self._DENIED_ATTRS or leaf.startswith("__"):
self._audit("modify", f"BLOCKED leaf '{leaf}'")
return f"Error: '{leaf}' is not accessible"
parent, err = self._resolve_path(parent_path) parent, err = self._resolve_path(parent_path)
if err: if err:
return f"Error: {err}" return f"Error: {err}"
@ -372,27 +422,29 @@ class SelfTool(Tool):
return f"Set _runtime_vars.{key} = {value!r}" return f"Set _runtime_vars.{key} = {value!r}"
@classmethod @classmethod
def _validate_json_safe(cls, value: Any, depth: int = 0, elements: int = 0) -> str | None: def _validate_json_safe(cls, value: Any, depth: int = 0, _elements: list[int] | None = None) -> str | None:
if _elements is None:
_elements = [0]
if depth > 10: if depth > 10:
return "value nesting too deep (max 10 levels)" return "value nesting too deep (max 10 levels)"
if isinstance(value, (str, int, float, bool, type(None))): if isinstance(value, (str, int, float, bool, type(None))):
return None return None
if isinstance(value, list): if isinstance(value, list):
elements += len(value) _elements[0] += len(value)
if elements > cls._MAX_VALUE_ELEMENTS: if _elements[0] > cls._MAX_VALUE_ELEMENTS:
return f"value too large (max {cls._MAX_VALUE_ELEMENTS} total elements)" return f"value too large (max {cls._MAX_VALUE_ELEMENTS} total elements)"
for i, item in enumerate(value): for i, item in enumerate(value):
if err := cls._validate_json_safe(item, depth + 1, elements): if err := cls._validate_json_safe(item, depth + 1, _elements):
return f"list[{i}] contains {err}" return f"list[{i}] contains {err}"
return None return None
if isinstance(value, dict): if isinstance(value, dict):
elements += len(value) _elements[0] += len(value)
if elements > cls._MAX_VALUE_ELEMENTS: if _elements[0] > cls._MAX_VALUE_ELEMENTS:
return f"value too large (max {cls._MAX_VALUE_ELEMENTS} total elements)" return f"value too large (max {cls._MAX_VALUE_ELEMENTS} total elements)"
for k, v in value.items(): for k, v in value.items():
if not isinstance(k, str): if not isinstance(k, str):
return f"dict key must be str, got {type(k).__name__}" return f"dict key must be str, got {type(k).__name__}"
if err := cls._validate_json_safe(v, depth + 1, elements): if err := cls._validate_json_safe(v, depth + 1, _elements):
return f"dict key '{k}' contains {err}" return f"dict key '{k}' contains {err}"
return None return None
return f"unsupported type {type(value).__name__}" return f"unsupported type {type(value).__name__}"
@ -412,9 +464,12 @@ class SelfTool(Tool):
result = obj(**(args or {})) result = obj(**(args or {}))
# Await if coroutine # Await if coroutine
if inspect.isawaitable(result): if inspect.isawaitable(result):
result = await result result = await asyncio.wait_for(result, timeout=self._CALL_TIMEOUT_SECONDS)
self._audit("call", method) self._audit("call", method)
return repr(result) return repr(result)
except asyncio.TimeoutError:
self._audit("call", f"{method} TIMEOUT after {self._CALL_TIMEOUT_SECONDS}s")
return f"Error: call to '{method}' timed out after {self._CALL_TIMEOUT_SECONDS}s"
except Exception as e: except Exception as e:
self._audit("call", f"{method} FAILED: {e}") self._audit("call", f"{method} FAILED: {e}")
return f"Error calling {method}: {e}" return f"Error calling {method}: {e}"

View File

@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import copy import copy
import time
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
@ -263,6 +264,28 @@ class TestModifyBlocked:
result = await tool.execute(action="modify", key="__class__", value="evil") result = await tool.execute(action="modify", key="__class__", value="evil")
assert "protected" in result assert "protected" in result
@pytest.mark.asyncio
async def test_modify_dotpath_leaf_dunder_blocked(self):
"""Fix 3.1: leaf segment of dot-path must also be validated."""
tool = _make_tool()
result = await tool.execute(
action="modify",
key="provider_retry_mode.__class__",
value="evil",
)
assert "not accessible" in result
@pytest.mark.asyncio
async def test_modify_dotpath_leaf_denied_attr_blocked(self):
"""Fix 3.1: leaf segment matching _DENIED_ATTRS must be rejected."""
tool = _make_tool()
result = await tool.execute(
action="modify",
key="provider_retry_mode.__globals__",
value={},
)
assert "not accessible" in result
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# modify — free tier (setattr priority) # modify — free tier (setattr priority)
@ -370,6 +393,31 @@ class TestModifyOpen:
assert "Set workspace" in result assert "Set workspace" in result
# ---------------------------------------------------------------------------
# validate_json_safe — element counting
# ---------------------------------------------------------------------------
class TestValidateJsonSafe:
def test_sibling_lists_counted_correctly(self):
"""Fix 3.3: sibling list elements should accumulate across recursion."""
# Two sibling lists of 600 each = 1200 total elements, over the 1024 limit
big_value = {"a": list(range(600)), "b": list(range(600))}
err = SelfTool._validate_json_safe(big_value)
assert err is not None
assert "too large" in err
def test_single_list_within_limit(self):
"""A single list of 500 items should pass."""
ok_value = list(range(500))
assert SelfTool._validate_json_safe(ok_value) is None
def test_deeply_nested_within_limit(self):
"""Deeply nested structures that stay under limit should pass."""
value = {"level1": {"level2": {"level3": list(range(100))}}}
assert SelfTool._validate_json_safe(value) is None
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# call — method invocation # call — method invocation
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -410,6 +458,25 @@ class TestCall:
) )
assert "completed" in result.lower() or result # just no error assert "completed" in result.lower() or result # just no error
@pytest.mark.asyncio
async def test_call_async_method_timeout(self):
"""Fix 3.2: async call that exceeds timeout should be cancelled."""
import asyncio
loop = _make_mock_loop()
loop.consolidator = MagicMock()
async def _slow(**kwargs):
await asyncio.sleep(60)
loop.consolidator.slow_method = _slow
tool = _make_tool(loop)
result = await tool.execute(
action="call",
method="consolidator.slow_method",
)
assert "timed out" in result
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_call_blocked_attr_in_path(self): async def test_call_blocked_attr_in_path(self):
tool = _make_tool() tool = _make_tool()
@ -713,3 +780,279 @@ class TestWatchdog:
loop._watchdog_check() loop._watchdog_check()
assert loop.max_iterations == 50 assert loop.max_iterations == 50
assert loop.context_window_tokens == 131072 assert loop.context_window_tokens == 131072
# ---------------------------------------------------------------------------
# SubagentStatus formatting
# ---------------------------------------------------------------------------
class TestSubagentStatusFormatting:
def test_format_single_status(self):
"""_format_value should produce a rich multi-line display for a SubagentStatus."""
from nanobot.agent.subagent import SubagentStatus
status = SubagentStatus(
task_id="abc12345",
label="read logs and summarize",
task_description="Read the log files and produce a summary",
started_at=time.monotonic() - 12.4,
phase="awaiting_tools",
iteration=3,
tool_events=[
{"name": "read_file", "status": "ok", "detail": "read app.log"},
{"name": "grep", "status": "ok", "detail": "searched ERROR"},
{"name": "exec", "status": "error", "detail": "timeout"},
],
usage={"prompt_tokens": 4500, "completion_tokens": 1200},
)
result = SelfTool._format_value(status)
assert "abc12345" in result
assert "read logs and summarize" in result
assert "awaiting_tools" in result
assert "iteration: 3" in result
assert "read_file(ok)" in result
assert "exec(error)" in result
assert "4500" in result
def test_format_status_dict(self):
"""_format_value should handle dict[str, SubagentStatus] with rich display."""
from nanobot.agent.subagent import SubagentStatus
statuses = {
"abc12345": SubagentStatus(
task_id="abc12345",
label="task A",
task_description="Do task A",
started_at=time.monotonic() - 5.0,
phase="awaiting_tools",
iteration=1,
),
}
result = SelfTool._format_value(statuses)
assert "1 subagent(s)" in result
assert "abc12345" in result
assert "task A" in result
def test_format_empty_status_dict(self):
"""Empty dict[str, SubagentStatus] should show 'no running subagents'."""
result = SelfTool._format_value({})
assert "{}" in result
def test_format_status_with_error(self):
"""Status with error should include the error message."""
from nanobot.agent.subagent import SubagentStatus
status = SubagentStatus(
task_id="err00001",
label="failing task",
task_description="A task that fails",
started_at=time.monotonic() - 1.0,
phase="error",
error="Connection refused",
)
result = SelfTool._format_value(status)
assert "error: Connection refused" in result
def test_format_subagent_manager_with_statuses(self):
"""SubagentManager-like object with _task_statuses should show rich display."""
from nanobot.agent.subagent import SubagentStatus
mgr = MagicMock()
mgr._running_tasks = {"abc": MagicMock()}
mgr._task_statuses = {
"abc": SubagentStatus(
task_id="abc",
label="work",
task_description="Do work",
started_at=time.monotonic() - 2.0,
phase="tools_completed",
iteration=2,
),
}
result = SelfTool._format_value(mgr)
assert "abc" in result
assert "work" in result
assert "tools_completed" in result
def test_format_subagent_manager_fallback_no_statuses(self):
"""SubagentManager with empty _task_statuses falls back to simple display."""
mgr = MagicMock()
mgr._running_tasks = {"abc": MagicMock()}
mgr._task_statuses = {}
result = SelfTool._format_value(mgr)
assert "1 running" in result
# ---------------------------------------------------------------------------
# _SubagentHook after_iteration updates status
# ---------------------------------------------------------------------------
class TestSubagentHookStatus:
@pytest.mark.asyncio
async def test_after_iteration_updates_status(self):
"""after_iteration should copy iteration, tool_events, usage to status."""
from nanobot.agent.subagent import SubagentStatus, _SubagentHook
from nanobot.agent.hook import AgentHookContext
status = SubagentStatus(
task_id="test",
label="test",
task_description="test",
started_at=time.monotonic(),
)
hook = _SubagentHook("test", status)
context = AgentHookContext(
iteration=5,
messages=[],
tool_events=[{"name": "read_file", "status": "ok", "detail": "ok"}],
usage={"prompt_tokens": 100, "completion_tokens": 50},
)
await hook.after_iteration(context)
assert status.iteration == 5
assert len(status.tool_events) == 1
assert status.tool_events[0]["name"] == "read_file"
assert status.usage == {"prompt_tokens": 100, "completion_tokens": 50}
@pytest.mark.asyncio
async def test_after_iteration_with_error(self):
"""after_iteration should set status.error when context has an error."""
from nanobot.agent.subagent import SubagentStatus, _SubagentHook
from nanobot.agent.hook import AgentHookContext
status = SubagentStatus(
task_id="test",
label="test",
task_description="test",
started_at=time.monotonic(),
)
hook = _SubagentHook("test", status)
context = AgentHookContext(
iteration=1,
messages=[],
error="something went wrong",
)
await hook.after_iteration(context)
assert status.error == "something went wrong"
@pytest.mark.asyncio
async def test_after_iteration_no_status_is_noop(self):
"""after_iteration with no status should be a no-op."""
from nanobot.agent.subagent import _SubagentHook
from nanobot.agent.hook import AgentHookContext
hook = _SubagentHook("test")
context = AgentHookContext(iteration=1, messages=[])
await hook.after_iteration(context) # should not raise
# ---------------------------------------------------------------------------
# Checkpoint callback updates status
# ---------------------------------------------------------------------------
class TestCheckpointCallback:
@pytest.mark.asyncio
async def test_checkpoint_updates_phase_and_iteration(self):
"""The _on_checkpoint callback should update status.phase and iteration."""
from nanobot.agent.subagent import SubagentStatus
import asyncio
status = SubagentStatus(
task_id="cp",
label="test",
task_description="test",
started_at=time.monotonic(),
)
# Simulate the checkpoint callback as defined in _run_subagent
async def _on_checkpoint(payload: dict) -> None:
status.phase = payload.get("phase", status.phase)
status.iteration = payload.get("iteration", status.iteration)
await _on_checkpoint({"phase": "awaiting_tools", "iteration": 2})
assert status.phase == "awaiting_tools"
assert status.iteration == 2
await _on_checkpoint({"phase": "tools_completed", "iteration": 3})
assert status.phase == "tools_completed"
assert status.iteration == 3
@pytest.mark.asyncio
async def test_checkpoint_preserves_phase_on_missing_key(self):
"""If payload doesn't have 'phase', status.phase should stay unchanged."""
from nanobot.agent.subagent import SubagentStatus
status = SubagentStatus(
task_id="cp",
label="test",
task_description="test",
started_at=time.monotonic(),
phase="initializing",
)
async def _on_checkpoint(payload: dict) -> None:
status.phase = payload.get("phase", status.phase)
status.iteration = payload.get("iteration", status.iteration)
await _on_checkpoint({"iteration": 1})
assert status.phase == "initializing"
assert status.iteration == 1
# ---------------------------------------------------------------------------
# inspect subagents._task_statuses via dot-path
# ---------------------------------------------------------------------------
class TestInspectTaskStatuses:
@pytest.mark.asyncio
async def test_inspect_task_statuses_dotpath(self):
"""self inspect subagents._task_statuses should show rich status."""
from nanobot.agent.subagent import SubagentStatus
loop = _make_mock_loop()
loop.subagents._task_statuses = {
"abc12345": SubagentStatus(
task_id="abc12345",
label="read logs",
task_description="Read the log files",
started_at=time.monotonic() - 8.0,
phase="awaiting_tools",
iteration=2,
tool_events=[{"name": "read_file", "status": "ok", "detail": "ok"}],
usage={"prompt_tokens": 500, "completion_tokens": 100},
),
}
tool = _make_tool(loop)
result = await tool.execute(action="inspect", key="subagents._task_statuses")
assert "abc12345" in result
assert "read logs" in result
assert "awaiting_tools" in result
@pytest.mark.asyncio
async def test_inspect_single_subagent_status(self):
"""self inspect should format a single SubagentStatus object."""
from nanobot.agent.subagent import SubagentStatus
loop = _make_mock_loop()
status = SubagentStatus(
task_id="xyz",
label="search code",
task_description="Search the codebase",
started_at=time.monotonic() - 3.0,
phase="done",
iteration=4,
stop_reason="completed",
)
loop.subagents._task_statuses = {"xyz": status}
tool = _make_tool(loop)
result = await tool.execute(action="inspect", key="subagents._task_statuses.xyz")
assert "xyz" in result
assert "search code" in result
assert "stop_reason: completed" in result