feat(agent): add SelfTool for runtime self-inspection and configuration

Add a built-in tool that lets the agent inspect and modify its own
runtime state (model, iterations, context window, etc.).

Key features:
- inspect: view current config, usage stats, and subagent status
- modify: adjust parameters at runtime (protected by type/range validation)
- Subagent observability: inspect running subagent tasks (phase,
  iteration, tool events, errors) — subagents are no longer a black box
- Watchdog corrects out-of-bounds values on each iteration
- Enabled by default in read-only mode (self_modify: false)
- All changes are in-memory only; restart restores defaults
- Comprehensive test suite (90 tests)

Includes a self-awareness skill (always-on) with progressive disclosure:
SKILL.md for core rules, references/examples.md for detailed scenarios.
This commit is contained in:
chengyongru 2026-04-15 09:15:27 +08:00
parent dec26396ed
commit d7b3c10a82
15 changed files with 2011 additions and 44 deletions

203
docs/MY_TOOL.md Normal file
View File

@ -0,0 +1,203 @@
# My Tool
Let the agent sense and adjust its own runtime state — like asking a coworker "are you busy? can you switch to a bigger monitor?"
## Why You Need It
Normal tools let the agent operate on the outside world (read/write files, search code). But the agent knows nothing about itself — it doesn't know which model it's running on, how many iterations are left, or how many tokens it has consumed.
My tool fills this gap. With it, the agent can:
- **Know who it is**: What model am I using? Where is my workspace? How many iterations remain?
- **Adapt on the fly**: Complex task? Expand the context window. Simple chat? Switch to a faster model.
- **Remember across turns**: Store notes in your scratchpad that persist into the next conversation turn.
## Configuration
Enabled by default (read-only mode). The agent can check its state but not set it.
```yaml
tools:
my_enabled: true # default: true
my_set: false # default: false (read-only)
```
To allow the agent to set its configuration (e.g. switch models, adjust parameters), set `my_set: true`.
All modifications are held in memory only — restart restores defaults.
---
## check — Check "my" current state
Without parameters, returns a key config overview:
```
my(action="check")
# → max_iterations: 40
# context_window_tokens: 65536
# model: 'anthropic/claude-sonnet-4-20250514'
# workspace: PosixPath('/tmp/workspace')
# provider_retry_mode: 'standard'
# max_tool_result_chars: 16000
# _current_iteration: 3
# _last_usage: {'prompt_tokens': 45000, 'completion_tokens': 8000}
# Note: prompt_tokens is cumulative across all turns, not current context window occupancy.
```
With a key parameter, drill into a specific config:
```
my(action="check", key="_last_usage.prompt_tokens")
# → How many prompt tokens I've used so far
my(action="check", key="model")
# → What model I'm currently running on
my(action="check", key="web_config.enable")
# → Whether web search is enabled
```
### What you can do with it
| Scenario | How |
|----------|-----|
| "What model are you using?" | `check("model")` |
| "How many more tool calls can you make?" | `check("max_iterations")` minus `check("_current_iteration")` |
| "How many tokens has this conversation used?" | `check("_last_usage")` — cumulative across all turns |
| "Where is your working directory?" | `check("workspace")` |
| "Show me your full config" | `check()` |
| "Are there any subagents running?" | `check("subagents")` — shows phase, iteration, elapsed time, tool events |
---
## set — Runtime tuning
Changes take effect immediately, no restart required.
```
my(action="set", key="max_iterations", value=80)
# → Bump iteration limit from 40 to 80
my(action="set", key="model", value="fast-model")
# → Switch to a faster model
my(action="set", key="context_window_tokens", value=131072)
# → Expand context window for long documents
```
You can also store custom state in your scratchpad:
```
my(action="set", key="current_project", value="nanobot")
my(action="set", key="user_style_preference", value="concise")
my(action="set", key="task_complexity", value="high")
# → These values persist into the next conversation turn
```
### Protected parameters
These parameters have type and range validation — invalid values are rejected:
| Parameter | Type | Range | Purpose |
|-----------|------|-------|---------|
| `max_iterations` | int | 1100 | Max tool calls per conversation turn |
| `context_window_tokens` | int | 4,0961,000,000 | Context window size |
| `model` | str | non-empty | LLM model to use |
Other parameters (e.g. `workspace`, `provider_retry_mode`, `max_tool_result_chars`) can be set freely, as long as the value is JSON-safe.
---
## Practical Scenarios
### "This task is complex, I need more room"
```
Agent: This codebase is large, let me expand my context window to handle it.
→ my(action="set", key="context_window_tokens", value=131072)
```
### "Simple question, don't waste compute"
```
Agent: This is a straightforward question, let me switch to a faster model.
→ my(action="set", key="model", value="fast-model")
```
### "Remember user preferences across turns"
```
Turn 1: my(action="set", key="user_prefers_concise", value=True)
Turn 2: my(action="check", key="user_prefers_concise")
# → True (still remembers the user likes concise replies)
```
### "Self-diagnosis"
```
User: "Why aren't you searching the web?"
Agent: Let me check my web config.
→ my(action="check", key="web_config.enable")
# → False
Agent: Web search is disabled — please set web.enable: true in your config.
```
### "Token budget management"
```
Agent: Let me check how much budget I have left.
→ my(action="check", key="_last_usage")
# → {"prompt_tokens": 45000, "completion_tokens": 8000}
Agent: I've used ~53k tokens total so far. I'll keep my remaining replies concise.
```
### "Subagent monitoring"
```
Agent: Let me check on the background tasks.
→ my(action="check", key="subagents")
# → 2 subagent(s):
# [task-1] 'Code review'
# phase: running, iteration: 5, elapsed: 12.3s
# tools: read(✓), grep(✓)
# usage: {'prompt_tokens': 8000, 'completion_tokens': 1200}
# [task-2] 'Write tests'
# phase: pending, iteration: 0, elapsed: 0.2s
# tools: none
Agent: The code review is progressing well. The test task hasn't started yet.
```
---
## Safety Mechanisms
Core design principle: **All modifications live in memory only. Restart restores defaults.** The agent cannot cause persistent damage.
### Off-limits (BLOCKED)
Cannot be checked or modified — fully hidden:
| Category | Attributes | Reason |
|----------|-----------|--------|
| Core infrastructure | `bus`, `provider`, `_running` | Changes would crash the system |
| Tool registry | `tools` | Must not remove its own tools |
| Subsystems | `runner`, `sessions`, `consolidator`, etc. | Affects other users/sessions |
| Sensitive data | `_mcp_servers`, `_pending_queues`, etc. | Contains credentials and message routing |
| Security boundaries | `restrict_to_workspace`, `channels_config` | Bypassing would violate isolation |
| Python internals | `__class__`, `__dict__`, etc. | Prevents sandbox escape |
### Read-only (check only)
Can be checked but not set:
| Category | Attributes | Reason |
|----------|-----------|--------|
| Subagent manager | `subagents` | Observable, but replacing breaks the system |
| Execution config | `exec_config` | Can check sandbox/enable status, cannot change it |
| Web config | `web_config` | Can check enable status, cannot change it |
| Iteration counter | `_current_iteration` | Updated by runner only |
### Sensitive field protection
Sub-fields matching sensitive names (`api_key`, `password`, `secret`, `token`, etc.) are blocked from both check and set, regardless of parent path. This prevents credential leaks via dot-path traversal (e.g. `web_config.search.api_key`).

View File

@ -27,6 +27,7 @@ from nanobot.agent.tools.notebook import NotebookEditTool
from nanobot.agent.tools.registry import ToolRegistry from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.search import GlobTool, GrepTool from nanobot.agent.tools.search import GlobTool, GrepTool
from nanobot.agent.tools.shell import ExecTool from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.self import MyTool
from nanobot.agent.tools.spawn import SpawnTool from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
@ -39,7 +40,7 @@ from nanobot.utils.helpers import image_placeholder_text, truncate_text as trunc
from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE
if TYPE_CHECKING: if TYPE_CHECKING:
from nanobot.config.schema import ChannelsConfig, ExecToolConfig, WebToolsConfig from nanobot.config.schema import ChannelsConfig, ExecToolConfig, ToolsConfig, WebToolsConfig
from nanobot.cron.service import CronService from nanobot.cron.service import CronService
@ -88,6 +89,9 @@ class _LoopHook(AgentHook):
await self._on_stream_end(resuming=resuming) await self._on_stream_end(resuming=resuming)
self._stream_buf = "" self._stream_buf = ""
async def before_iteration(self, context: AgentHookContext) -> None:
self._loop._current_iteration = context.iteration
async def before_execute_tools(self, context: AgentHookContext) -> None: async def before_execute_tools(self, context: AgentHookContext) -> None:
if self._on_progress: if self._on_progress:
if not self._on_stream: if not self._on_stream:
@ -154,9 +158,11 @@ class AgentLoop:
hooks: list[AgentHook] | None = None, hooks: list[AgentHook] | None = None,
unified_session: bool = False, unified_session: bool = False,
disabled_skills: list[str] | None = None, disabled_skills: list[str] | None = None,
tools_config: ToolsConfig | None = None,
): ):
from nanobot.config.schema import ExecToolConfig, WebToolsConfig from nanobot.config.schema import ExecToolConfig, ToolsConfig, WebToolsConfig
_tc = tools_config or ToolsConfig()
defaults = AgentDefaults() defaults = AgentDefaults()
self.bus = bus self.bus = bus
self.channels_config = channels_config self.channels_config = channels_config
@ -240,6 +246,10 @@ class AgentLoop:
model=self.model, model=self.model,
) )
self._register_default_tools() self._register_default_tools()
if _tc.my_enabled:
self.tools.register(MyTool(loop=self, modify_allowed=_tc.my_set))
self._runtime_vars: dict[str, Any] = {}
self._current_iteration: int = 0
self.commands = CommandRouter() self.commands = CommandRouter()
register_builtin_commands(self.commands) register_builtin_commands(self.commands)
@ -306,7 +316,7 @@ class AgentLoop:
def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Update context for all tools that need routing info.""" """Update context for all tools that need routing info."""
for name in ("message", "spawn", "cron"): for name in ("message", "spawn", "cron", "my"):
if tool := self.tools.get(name): if tool := self.tools.get(name):
if hasattr(tool, "set_context"): if hasattr(tool, "set_context"):
tool.set_context(channel, chat_id, *([message_id] if name == "message" else [])) tool.set_context(channel, chat_id, *([message_id] if name == "message" else []))
@ -420,6 +430,11 @@ class AgentLoop:
self._last_usage = result.usage self._last_usage = result.usage
if result.stop_reason == "max_iterations": if result.stop_reason == "max_iterations":
logger.warning("Max iterations ({}) reached", self.max_iterations) logger.warning("Max iterations ({}) reached", self.max_iterations)
# Push final content through stream so streaming channels (e.g. Feishu)
# update the card instead of leaving it empty.
if on_stream and on_stream_end:
await on_stream(result.final_content or "")
await on_stream_end(resuming=False)
elif result.stop_reason == "error": elif result.stop_reason == "error":
logger.error("LLM returned error: {}", (result.final_content or "")[:200]) logger.error("LLM returned error: {}", (result.final_content or "")[:200])
return result.final_content, result.tools_used, result.messages, result.stop_reason, result.had_injections return result.final_content, result.tools_used, result.messages, result.stop_reason, result.had_injections

View File

@ -2,7 +2,9 @@
import asyncio import asyncio
import json import json
import time
import uuid import uuid
from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -23,12 +25,29 @@ from nanobot.config.schema import ExecToolConfig, WebToolsConfig
from nanobot.providers.base import LLMProvider from nanobot.providers.base import LLMProvider
class _SubagentHook(AgentHook): @dataclass(slots=True)
"""Logging-only hook for subagent execution.""" class SubagentStatus:
"""Real-time status of a running subagent."""
def __init__(self, task_id: str) -> None: task_id: str
label: str
task_description: str
started_at: float # time.monotonic()
phase: str = "initializing" # initializing | awaiting_tools | tools_completed | final_response | done | error
iteration: int = 0
tool_events: list = field(default_factory=list) # [{name, status, detail}, ...]
usage: dict = field(default_factory=dict) # token usage
stop_reason: str | None = None
error: str | None = None
class _SubagentHook(AgentHook):
"""Hook for subagent execution — logs tool calls and updates status."""
def __init__(self, task_id: str, status: SubagentStatus | None = None) -> None:
super().__init__() super().__init__()
self._task_id = task_id self._task_id = task_id
self._status = status
async def before_execute_tools(self, context: AgentHookContext) -> None: async def before_execute_tools(self, context: AgentHookContext) -> None:
for tool_call in context.tool_calls: for tool_call in context.tool_calls:
@ -38,6 +57,15 @@ class _SubagentHook(AgentHook):
self._task_id, tool_call.name, args_str, self._task_id, tool_call.name, args_str,
) )
async def after_iteration(self, context: AgentHookContext) -> None:
if self._status is None:
return
self._status.iteration = context.iteration
self._status.tool_events = list(context.tool_events)
self._status.usage = dict(context.usage)
if context.error:
self._status.error = str(context.error)
class SubagentManager: class SubagentManager:
"""Manages background subagent execution.""" """Manages background subagent execution."""
@ -54,8 +82,6 @@ class SubagentManager:
restrict_to_workspace: bool = False, restrict_to_workspace: bool = False,
disabled_skills: list[str] | None = None, disabled_skills: list[str] | None = None,
): ):
from nanobot.config.schema import ExecToolConfig
self.provider = provider self.provider = provider
self.workspace = workspace self.workspace = workspace
self.bus = bus self.bus = bus
@ -67,6 +93,7 @@ class SubagentManager:
self.disabled_skills = set(disabled_skills or []) self.disabled_skills = set(disabled_skills or [])
self.runner = AgentRunner(provider) self.runner = AgentRunner(provider)
self._running_tasks: dict[str, asyncio.Task[None]] = {} self._running_tasks: dict[str, asyncio.Task[None]] = {}
self._task_statuses: dict[str, SubagentStatus] = {}
self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...} self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...}
async def spawn( async def spawn(
@ -82,8 +109,16 @@ class SubagentManager:
display_label = label or task[:30] + ("..." if len(task) > 30 else "") display_label = label or task[:30] + ("..." if len(task) > 30 else "")
origin = {"channel": origin_channel, "chat_id": origin_chat_id} origin = {"channel": origin_channel, "chat_id": origin_chat_id}
status = SubagentStatus(
task_id=task_id,
label=display_label,
task_description=task,
started_at=time.monotonic(),
)
self._task_statuses[task_id] = status
bg_task = asyncio.create_task( bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin) self._run_subagent(task_id, task, display_label, origin, status)
) )
self._running_tasks[task_id] = bg_task self._running_tasks[task_id] = bg_task
if session_key: if session_key:
@ -91,6 +126,7 @@ class SubagentManager:
def _cleanup(_: asyncio.Task) -> None: def _cleanup(_: asyncio.Task) -> None:
self._running_tasks.pop(task_id, None) self._running_tasks.pop(task_id, None)
self._task_statuses.pop(task_id, None)
if session_key and (ids := self._session_tasks.get(session_key)): if session_key and (ids := self._session_tasks.get(session_key)):
ids.discard(task_id) ids.discard(task_id)
if not ids: if not ids:
@ -107,10 +143,15 @@ class SubagentManager:
task: str, task: str,
label: str, label: str,
origin: dict[str, str], origin: dict[str, str],
status: SubagentStatus,
) -> None: ) -> None:
"""Execute the subagent task and announce the result.""" """Execute the subagent task and announce the result."""
logger.info("Subagent [{}] starting task: {}", task_id, label) logger.info("Subagent [{}] starting task: {}", task_id, label)
async def _on_checkpoint(payload: dict) -> None:
status.phase = payload.get("phase", status.phase)
status.iteration = payload.get("iteration", status.iteration)
try: try:
# Build subagent tools (no message tool, no spawn tool) # Build subagent tools (no message tool, no spawn tool)
tools = ToolRegistry() tools = ToolRegistry()
@ -145,40 +186,38 @@ class SubagentManager:
model=self.model, model=self.model,
max_iterations=15, max_iterations=15,
max_tool_result_chars=self.max_tool_result_chars, max_tool_result_chars=self.max_tool_result_chars,
hook=_SubagentHook(task_id), hook=_SubagentHook(task_id, status),
max_iterations_message="Task completed but no final response was generated.", max_iterations_message="Task completed but no final response was generated.",
error_message=None, error_message=None,
fail_on_tool_error=True, fail_on_tool_error=True,
checkpoint_callback=_on_checkpoint,
)) ))
if result.stop_reason == "tool_error": status.phase = "done"
await self._announce_result( status.stop_reason = result.stop_reason
task_id,
label,
task,
self._format_partial_progress(result),
origin,
"error",
)
return
if result.stop_reason == "error":
await self._announce_result(
task_id,
label,
task,
result.error or "Error: subagent execution failed.",
origin,
"error",
)
return
final_result = result.final_content or "Task completed but no final response was generated."
logger.info("Subagent [{}] completed successfully", task_id) if result.stop_reason == "tool_error":
await self._announce_result(task_id, label, task, final_result, origin, "ok") status.tool_events = list(result.tool_events)
await self._announce_result(
task_id, label, task,
self._format_partial_progress(result),
origin, "error",
)
elif result.stop_reason == "error":
await self._announce_result(
task_id, label, task,
result.error or "Error: subagent execution failed.",
origin, "error",
)
else:
final_result = result.final_content or "Task completed but no final response was generated."
logger.info("Subagent [{}] completed successfully", task_id)
await self._announce_result(task_id, label, task, final_result, origin, "ok")
except Exception as e: except Exception as e:
error_msg = f"Error: {str(e)}" status.phase = "error"
status.error = str(e)
logger.error("Subagent [{}] failed: {}", task_id, e) logger.error("Subagent [{}] failed: {}", task_id, e)
await self._announce_result(task_id, label, task, error_msg, origin, "error") await self._announce_result(task_id, label, task, f"Error: {e}", origin, "error")
async def _announce_result( async def _announce_result(
self, self,

439
nanobot/agent/tools/self.py Normal file
View File

@ -0,0 +1,439 @@
"""MyTool: runtime state inspection and configuration for the agent loop."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Any
from loguru import logger
from nanobot.agent.subagent import SubagentStatus
from nanobot.agent.tools.base import Tool
if TYPE_CHECKING:
from nanobot.agent.loop import AgentLoop
def _has_real_attr(obj: Any, key: str) -> bool:
"""Check if obj has a real (explicitly set) attribute, not auto-generated by mock."""
if isinstance(obj, dict):
return key in obj
d = getattr(obj, "__dict__", None)
if d is not None and key in d:
return True
for cls in type(obj).__mro__:
if key in cls.__dict__:
return True
return False
class MyTool(Tool):
"""Check and set the agent loop's runtime configuration."""
BLOCKED = frozenset({
# Core infrastructure
"bus", "provider", "_running", "tools",
# Config management
"_runtime_vars",
# Subsystems
"runner", "sessions", "consolidator",
"dream", "auto_compact", "context", "commands",
# Sensitive runtime state (credentials, message routing, task tracking)
"_mcp_servers", "_mcp_stacks", "_pending_queues",
"_session_locks", "_active_tasks", "_background_tasks",
# Security boundaries (inspect + modify both blocked)
"restrict_to_workspace", "channels_config",
"_concurrency_gate", "_unified_session", "_extra_hooks",
})
READ_ONLY = frozenset({
"subagents", # observable but replacing it would break the system
"_current_iteration", # updated by runner only
"exec_config", # inspect allowed (e.g. check sandbox), modify blocked
"web_config", # inspect allowed (e.g. check enable), modify blocked
})
_DENIED_ATTRS = frozenset({
"__class__", "__dict__", "__bases__", "__subclasses__", "__mro__",
"__init__", "__new__", "__reduce__", "__getstate__", "__setstate__",
"__del__", "__call__", "__getattr__", "__setattr__", "__delattr__",
"__code__", "__globals__", "func_globals", "func_code",
"__wrapped__", "__closure__",
})
# Sub-field names that are sensitive regardless of parent path
_SENSITIVE_NAMES = frozenset({
"api_key", "secret", "password", "token", "credential",
"private_key", "access_token", "refresh_token", "auth",
})
RESTRICTED: dict[str, dict[str, Any]] = {
"max_iterations": {"type": int, "min": 1, "max": 100},
"context_window_tokens": {"type": int, "min": 4096, "max": 1_000_000},
"model": {"type": str, "min_len": 1},
}
_MAX_RUNTIME_KEYS = 64
def __init__(self, loop: AgentLoop, modify_allowed: bool = True) -> None:
self._loop = loop
self._modify_allowed = modify_allowed
self._channel = ""
self._chat_id = ""
def __deepcopy__(self, memo: dict[int, Any]) -> MyTool:
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
result._loop = self._loop
result._modify_allowed = self._modify_allowed
result._channel = self._channel
result._chat_id = self._chat_id
return result
def set_context(self, channel: str, chat_id: str) -> None:
self._channel = channel
self._chat_id = chat_id
@property
def name(self) -> str:
return "my"
@property
def description(self) -> str:
base = (
"Check and set your own runtime state.\n"
"Actions: check, set.\n"
"- check (no key): full config overview — start here.\n"
"- check (key): drill into a value. Dot-paths allowed "
"(e.g. '_last_usage.prompt_tokens', 'web_config.enable').\n"
"- set (key, value): change config or store notes in your scratchpad. "
"Scratchpad keys persist across turns but not restarts.\n"
"Key values: _current_iteration (current progress), "
"max_iterations - _current_iteration = remaining iterations.\n"
"Note: web_config and exec_config are readable but read-only.\n"
"\n"
"When to use:\n"
"- User asks about your model, settings, or token usage → check that key.\n"
"- A tool fails or behaves unexpectedly → check the related config to diagnose.\n"
"- User asks you to remember a preference for this session → set to store it in your scratchpad.\n"
"- About to start a large task → check context_window_tokens and max_iterations first."
)
if not self._modify_allowed:
base += "\nREAD-ONLY MODE: set is disabled."
else:
base += (
"\nIMPORTANT: Before setting state, predict the potential impact. "
"If the operation could cause crashes or instability "
"(e.g. changing model), warn the user first."
)
return base
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["check", "set"],
"description": "Action to perform",
},
"key": {
"type": "string",
"description": "Dot-path for check/set. Examples: 'max_iterations', 'workspace', 'provider_retry_mode'. "
"For check without key, shows all config values.",
},
"value": {"description": "New value (for set). Type must match target (int for max_iterations/context_window_tokens, str for model)."},
},
"required": ["action"],
}
def _audit(self, action: str, detail: str) -> None:
session = f"{self._channel}:{self._chat_id}" if self._channel else "unknown"
logger.info("self.{} | {} | session:{}", action, detail, session)
# ------------------------------------------------------------------
# Path resolution
# ------------------------------------------------------------------
def _resolve_path(self, path: str) -> tuple[Any, str | None]:
parts = path.split(".")
obj = self._loop
for part in parts:
if part in self._DENIED_ATTRS or part.startswith("__"):
return None, f"'{part}' is not accessible"
if part in self.BLOCKED:
return None, f"'{part}' is not accessible"
if part.lower() in self._SENSITIVE_NAMES:
return None, f"'{part}' is not accessible"
try:
if isinstance(obj, dict):
if part in obj:
obj = obj[part]
else:
return None, f"'{part}' not found in dict"
else:
obj = getattr(obj, part)
except (KeyError, AttributeError) as e:
return None, f"'{part}' not found: {e}"
return obj, None
@staticmethod
def _validate_key(key: str | None, label: str = "key") -> str | None:
if not key or not key.strip():
return f"Error: '{label}' cannot be empty or whitespace"
return None
# ------------------------------------------------------------------
# Smart formatting
# ------------------------------------------------------------------
@staticmethod
def _format_status(st: SubagentStatus, indent: str = " ") -> str:
elapsed = time.monotonic() - st.started_at
tool_summary = ", ".join(
f"{e.get('name', '?')}({e.get('status', '?')})" for e in st.tool_events[-5:]
) or "none"
lines = [
f"{indent}phase: {st.phase}, iteration: {st.iteration}, elapsed: {elapsed:.1f}s",
f"{indent}tools: {tool_summary}",
f"{indent}usage: {st.usage or 'n/a'}",
]
if st.error:
lines.append(f"{indent}error: {st.error}")
if st.stop_reason:
lines.append(f"{indent}stop_reason: {st.stop_reason}")
return "\n".join(lines)
@staticmethod
def _format_value(val: Any, key: str = "") -> str:
if isinstance(val, SubagentStatus):
header = f"Subagent [{val.task_id}] '{val.label}'"
detail = MyTool._format_status(val, " ")
return f"{header}\n task: {val.task_description}\n{detail}"
# SubagentManager: delegate to its _task_statuses dict
if hasattr(val, "_task_statuses") and isinstance(val._task_statuses, dict):
return MyTool._format_value(val._task_statuses, key)
if isinstance(val, dict) and val and isinstance(next(iter(val.values())), SubagentStatus):
prefix = f"{key}: " if key else ""
lines = [f"{prefix}{len(val)} subagent(s):"]
for tid, st in val.items():
detail = MyTool._format_status(st, " ")
lines.append(f" [{tid}] '{st.label}'\n{detail}")
return "\n".join(lines)
if hasattr(val, "tool_names"):
return f"tools: {len(val.tool_names)} registered — {val.tool_names}"
# Scalar types — repr is fine
if isinstance(val, (str, int, float, bool, type(None))):
r = repr(val)
return f"{key}: {r}" if key else r
# Dict — small: show content; large: show keys for dot-path navigation
if isinstance(val, dict):
ks = list(val.keys())
if not ks:
return f"{key}: {{}}" if key else "{}"
if len(ks) <= 5:
r = repr(val)
if len(r) <= 200:
return f"{key}: {r}" if key else r
preview = ", ".join(str(k) for k in ks[:15])
suffix = ", ..." if len(ks) > 15 else ""
return f"{key}: {{{preview}{suffix}}}" if key else f"{{{preview}{suffix}}}"
# List/tuple — count for large, repr for small
if isinstance(val, (list, tuple)):
if len(val) > 20:
return f"{key}: [{len(val)} items]" if key else f"[{len(val)} items]"
r = repr(val)
return f"{key}: {r}" if key else r
# Complex object — small Pydantic models: show values; others: show field names for navigation
cls_name = type(val).__name__
if hasattr(val, "model_fields"):
fields = list(val.model_fields.keys())
if len(fields) <= 8:
# Small config objects: show field=value pairs
pairs = []
for f in fields:
fv = getattr(val, f, "?")
if isinstance(fv, (str, int, float, bool, type(None))):
pairs.append(f"{f}={fv!r}")
else:
pairs.append(f"{f}=<{type(fv).__name__}>")
preview = ", ".join(pairs)
return f"{key}: {preview}" if key else preview
else:
fields = [a for a in getattr(val, "__dict__", {}) if not a.startswith("__")]
if fields:
preview = ", ".join(str(f) for f in fields[:20])
suffix = ", ..." if len(fields) > 20 else ""
return f"{key}: <{cls_name}> [{preview}{suffix}]" if key else f"<{cls_name}> [{preview}{suffix}]"
r = repr(val)
return f"{key}: {r}" if key else r
# ------------------------------------------------------------------
# Action dispatch
# ------------------------------------------------------------------
async def execute(
self,
action: str,
key: str | None = None,
value: Any = None,
**_kwargs: Any,
) -> str:
if action in ("inspect", "check"):
return self._inspect(key)
if not self._modify_allowed:
return "Error: set is disabled (my_set is False)"
if action in ("modify", "set"):
return self._modify(key, value)
return f"Unknown action: {action}"
# -- inspect --
def _inspect(self, key: str | None) -> str:
if not key:
return self._inspect_all()
top = key.split(".")[0]
if top in self._DENIED_ATTRS or top.startswith("__"):
return f"Error: '{top}' is not accessible"
obj, err = self._resolve_path(key)
if err:
# "scratchpad" alias for _runtime_vars
if key == "scratchpad":
rv = self._loop._runtime_vars
return self._format_value(rv, "scratchpad") if rv else "scratchpad is empty"
# Fallback: check _runtime_vars for simple keys stored by modify
if "." not in key and key in self._loop._runtime_vars:
return self._format_value(self._loop._runtime_vars[key], key)
return f"Error: {err}"
# Guard against mock auto-generated attributes
if "." not in key and not _has_real_attr(self._loop, key):
if key in self._loop._runtime_vars:
return self._format_value(self._loop._runtime_vars[key], key)
return f"Error: '{key}' not found"
return self._format_value(obj, key)
def _inspect_all(self) -> str:
loop = self._loop
parts: list[str] = []
# RESTRICTED keys
for k in self.RESTRICTED:
parts.append(self._format_value(getattr(loop, k, None), k))
# Other useful top-level keys shown in description
for k in ("workspace", "provider_retry_mode", "max_tool_result_chars", "_current_iteration", "web_config", "exec_config", "subagents"):
if _has_real_attr(loop, k):
parts.append(self._format_value(getattr(loop, k, None), k))
# Token usage
usage = loop._last_usage
if usage:
parts.append(self._format_value(usage, "_last_usage"))
rv = loop._runtime_vars
if rv:
parts.append(self._format_value(rv, "scratchpad"))
return "\n".join(parts)
# -- modify --
def _modify(self, key: str | None, value: Any) -> str:
if err := self._validate_key(key):
return err
top = key.split(".")[0]
if top in self.BLOCKED or top in self._DENIED_ATTRS or top.startswith("__") or top.lower() in self._SENSITIVE_NAMES:
self._audit("modify", f"BLOCKED {key}")
return f"Error: '{key}' is protected and cannot be modified"
if top in self.READ_ONLY:
self._audit("modify", f"READ_ONLY {key}")
return f"Error: '{key}' is read-only and cannot be modified"
if "." in key:
parent_path, leaf = key.rsplit(".", 1)
if leaf in self._DENIED_ATTRS or leaf.startswith("__"):
self._audit("modify", f"BLOCKED leaf '{leaf}'")
return f"Error: '{leaf}' is not accessible"
if leaf.lower() in self._SENSITIVE_NAMES:
self._audit("modify", f"BLOCKED sensitive leaf '{leaf}'")
return f"Error: '{leaf}' is not accessible"
parent, err = self._resolve_path(parent_path)
if err:
return f"Error: {err}"
if isinstance(parent, dict):
parent[leaf] = value
else:
setattr(parent, leaf, value)
self._audit("modify", f"{key} = {value!r}")
return f"Set {key} = {value!r}"
if key in self.RESTRICTED:
return self._modify_restricted(key, value)
return self._modify_free(key, value)
def _modify_restricted(self, key: str, value: Any) -> str:
spec = self.RESTRICTED[key]
expected = spec["type"]
if expected is int and isinstance(value, bool):
return f"Error: '{key}' must be {expected.__name__}, got bool"
if not isinstance(value, expected):
try:
value = expected(value)
except (ValueError, TypeError):
return f"Error: '{key}' must be {expected.__name__}, got {type(value).__name__}"
old = getattr(self._loop, key)
if "min" in spec and value < spec["min"]:
return f"Error: '{key}' must be >= {spec['min']}"
if "max" in spec and value > spec["max"]:
return f"Error: '{key}' must be <= {spec['max']}"
if "min_len" in spec and len(str(value)) < spec["min_len"]:
return f"Error: '{key}' must be at least {spec['min_len']} characters"
setattr(self._loop, key, value)
self._audit("modify", f"{key}: {old!r} -> {value!r}")
return f"Set {key} = {value!r} (was {old!r})"
def _modify_free(self, key: str, value: Any) -> str:
if _has_real_attr(self._loop, key):
old = getattr(self._loop, key)
if isinstance(old, (str, int, float, bool)):
old_t, new_t = type(old), type(value)
if old_t is float and new_t is int:
pass # int → float coercion allowed
elif old_t is not new_t:
self._audit(
"modify",
f"REJECTED type mismatch {key}: expects {old_t.__name__}, got {new_t.__name__}",
)
return f"Error: '{key}' expects {old_t.__name__}, got {new_t.__name__}"
setattr(self._loop, key, value)
self._audit("modify", f"{key}: {old!r} -> {value!r}")
return f"Set {key} = {value!r} (was {old!r})"
if callable(value):
self._audit("modify", f"REJECTED callable {key}")
return "Error: cannot store callable values"
err = self._validate_json_safe(value)
if err:
self._audit("modify", f"REJECTED {key}: {err}")
return f"Error: {err}"
if key not in self._loop._runtime_vars and len(self._loop._runtime_vars) >= self._MAX_RUNTIME_KEYS:
self._audit("modify", f"REJECTED {key}: max keys ({self._MAX_RUNTIME_KEYS}) reached")
return f"Error: scratchpad is full (max {self._MAX_RUNTIME_KEYS} keys). Remove unused keys first."
old = self._loop._runtime_vars.get(key)
self._loop._runtime_vars[key] = value
self._audit("modify", f"scratchpad.{key}: {old!r} -> {value!r}")
return f"Set scratchpad.{key} = {value!r}"
@classmethod
def _validate_json_safe(cls, value: Any, depth: int = 0) -> str | None:
if depth > 10:
return "value nesting too deep (max 10 levels)"
if isinstance(value, (str, int, float, bool, type(None))):
return None
if isinstance(value, list):
for i, item in enumerate(value):
if err := cls._validate_json_safe(item, depth + 1):
return f"list[{i}] contains {err}"
return None
if isinstance(value, dict):
for k, v in value.items():
if not isinstance(k, str):
return f"dict key must be str, got {type(k).__name__}"
if err := cls._validate_json_safe(v, depth + 1):
return f"dict key '{k}' contains {err}"
return None
return f"unsupported type {type(value).__name__}"

View File

@ -593,6 +593,7 @@ def serve(
unified_session=runtime_config.agents.defaults.unified_session, unified_session=runtime_config.agents.defaults.unified_session,
disabled_skills=runtime_config.agents.defaults.disabled_skills, disabled_skills=runtime_config.agents.defaults.disabled_skills,
session_ttl_minutes=runtime_config.agents.defaults.session_ttl_minutes, session_ttl_minutes=runtime_config.agents.defaults.session_ttl_minutes,
tools_config=runtime_config.tools,
) )
model_name = runtime_config.agents.defaults.model model_name = runtime_config.agents.defaults.model
@ -687,6 +688,7 @@ def gateway(
unified_session=config.agents.defaults.unified_session, unified_session=config.agents.defaults.unified_session,
disabled_skills=config.agents.defaults.disabled_skills, disabled_skills=config.agents.defaults.disabled_skills,
session_ttl_minutes=config.agents.defaults.session_ttl_minutes, session_ttl_minutes=config.agents.defaults.session_ttl_minutes,
tools_config=config.tools,
) )
# Set cron callback (needs agent) # Set cron callback (needs agent)
@ -921,6 +923,7 @@ def agent(
unified_session=config.agents.defaults.unified_session, unified_session=config.agents.defaults.unified_session,
disabled_skills=config.agents.defaults.disabled_skills, disabled_skills=config.agents.defaults.disabled_skills,
session_ttl_minutes=config.agents.defaults.session_ttl_minutes, session_ttl_minutes=config.agents.defaults.session_ttl_minutes,
tools_config=config.tools,
) )
restart_notice = consume_restart_notice_from_env() restart_notice = consume_restart_notice_from_env()
if restart_notice and should_show_cli_restart_notice(restart_notice, session_id): if restart_notice and should_show_cli_restart_notice(restart_notice, session_id):

View File

@ -206,6 +206,8 @@ class ToolsConfig(Base):
restrict_to_workspace: bool = False # restrict all tool access to workspace directory restrict_to_workspace: bool = False # restrict all tool access to workspace directory
mcp_servers: dict[str, MCPServerConfig] = Field(default_factory=dict) mcp_servers: dict[str, MCPServerConfig] = Field(default_factory=dict)
ssrf_whitelist: list[str] = Field(default_factory=list) # CIDR ranges to exempt from SSRF blocking (e.g. ["100.64.0.0/10"] for Tailscale) ssrf_whitelist: list[str] = Field(default_factory=list) # CIDR ranges to exempt from SSRF blocking (e.g. ["100.64.0.0/10"] for Tailscale)
my_enabled: bool = True # enable the my tool (agent runtime state inspection)
my_set: bool = False # allow my tool to set state (read-only if False)
class Config(BaseSettings): class Config(BaseSettings):

View File

@ -84,6 +84,7 @@ class Nanobot:
unified_session=defaults.unified_session, unified_session=defaults.unified_session,
disabled_skills=defaults.disabled_skills, disabled_skills=defaults.disabled_skills,
session_ttl_minutes=defaults.session_ttl_minutes, session_ttl_minutes=defaults.session_ttl_minutes,
tools_config=config.tools,
) )
return cls(loop) return cls(loop)

View File

@ -0,0 +1,72 @@
---
name: my
description: Check and set the agent's own runtime state (model, iterations, context window, token usage, web config). Use when diagnosing why something doesn't work ("why can't you search the web?", "why did you stop?"), checking resource limits before complex tasks, adapting configuration for long or simple tasks, or remembering user preferences across turns. Also use when the user asks what model you are running, how many tokens you've used, or what your settings are.
always: true
---
# Self-Awareness
## How to use
1. **Identify the situation** from the categories below
2. **Call the my tool** with the appropriate action
3. **If set**, warn the user before changing impactful settings (model, iterations)
4. **For detailed examples**, read [references/examples.md](references/examples.md)
## When to check
<rule>
**Diagnose before explaining.** When something doesn't work, check your state first.
</rule>
<rule>
**Check budget before complex tasks.** Know your limits before committing.
</rule>
<rule>
**Recall across turns.** Store preferences in your scratchpad, read them back later.
</rule>
## When to set
<rule>
**Only set when benefit is clear and user is informed.** Warn before changing model.
</rule>
| Situation | Command |
|-----------|---------|
| Large codebase analysis | `my(action="set", key="context_window_tokens", value=131072)` |
| Repetitive simple tasks | `my(action="set", key="model", value="<fast-model>")` |
| Long multi-step task | `my(action="set", key="max_iterations", value=80)` |
**Tradeoff:** Bias toward stability. Only set when defaults are genuinely insufficient.
## Anti-patterns
<rule>
**Don't check every turn.** Costs a tool call. Use when you need information, not reflexively.
</rule>
<rule>
**Don't store sensitive data.** No API keys, passwords, or tokens in scratchpad.
</rule>
<rule>
**Don't set workspace.** Does not update file tool boundaries — won't work.
</rule>
## Constraints
- All modifications in-memory only — restart resets everything
- Protected params have type/range validation: `max_iterations` (1100), `context_window_tokens` (40961M), `model` (non-empty str)
- If `my_set` is false, check only
## Related tools
| Need | Use | Persists? |
|------|-----|-----------|
| Per-session temp state | `my(action="set", key="...", value=...)` | No |
| Long-term facts | Memory skill (`MEMORY.md`, `USER.md`) | Yes |
| Permanent config change | Edit config file | Yes |
**Rule of thumb:** Tomorrow? Memory. This turn only? My.

View File

@ -0,0 +1,75 @@
# My Tool — Practical Examples
Concrete scenarios showing when and how to use the my tool effectively.
## Diagnosis
### "Why can't you search the web?"
```
→ my(action="check", key="web_config.enable")
→ False
→ "Web search is disabled. Add web.enable: true to your config to enable it."
```
### "Why did you stop?"
```
→ my(action="check", key="max_iterations")
→ 40
→ my(action="check", key="_last_usage")
→ {"prompt_tokens": 62000, "completion_tokens": 3000}
→ "I hit the iteration limit (40). The task was complex. I can ask the user if they want to increase it."
```
### "What model are you running?"
```
→ my(action="check", key="model")
→ 'anthropic/claude-sonnet-4-20250514'
```
## Adaptive Behavior
### Large codebase analysis
```
→ my(action="check")
→ context_window_tokens: 65536
→ my(action="set", key="context_window_tokens", value=131072)
→ "Set context_window_tokens = 131072 (was 65536)"
→ "I've expanded my context window to handle this large codebase."
```
### Switching to a faster model for repetitive tasks
```
→ my(action="set", key="model", value="anthropic/claude-haiku-4-5-20251001")
→ "Set model = 'anthropic/claude-haiku-4-5-20251001' (was 'anthropic/claude-sonnet-4-20250514')"
→ "Switched to a faster model for these batch tasks."
```
## Cross-Turn Memory
### Remembering user preferences
```
# Turn 1: user says "keep it brief"
→ my(action="set", key="user_style", value="concise")
→ "Set scratchpad.user_style = 'concise'"
# Turn 3: new topic
→ my(action="check", key="user_style")
→ 'concise'
(adjusts response style accordingly)
```
### Tracking project context
```
→ my(action="set", key="active_branch", value="feat/auth")
→ my(action="set", key="test_framework", value="pytest")
→ my(action="set", key="has_docker", value=true)
```
## Budget Awareness
### Token-conscious behavior
```
→ my(action="check", key="_last_usage")
→ {"prompt_tokens": 58000, "completion_tokens": 12000}
→ "I've consumed ~70k tokens. I'll keep my remaining responses focused."
```

0
tests/agent/__init__.py Normal file
View File

View File

@ -1018,7 +1018,7 @@ async def test_runner_tool_error_sets_final_content():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, monkeypatch): async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, monkeypatch):
from nanobot.agent.subagent import SubagentManager from nanobot.agent.subagent import SubagentManager, SubagentStatus
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
bus = MessageBus() bus = MessageBus()
@ -1041,7 +1041,8 @@ async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, mon
monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute) monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute)
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) status = SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic())
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status)
mgr._announce_result.assert_awaited_once() mgr._announce_result.assert_awaited_once()
args = mgr._announce_result.await_args.args args = mgr._announce_result.await_args.args

View File

@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import time
from types import SimpleNamespace from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
@ -269,7 +270,9 @@ class TestSubagentCancellation:
monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute) monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute)
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) from nanobot.agent.subagent import SubagentStatus
status = SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic())
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status)
assistant_messages = [ assistant_messages = [
msg for msg in captured_second_call msg for msg in captured_second_call
@ -308,7 +311,9 @@ class TestSubagentCancellation:
mgr.runner.run = AsyncMock(side_effect=fake_run) mgr.runner.run = AsyncMock(side_effect=fake_run)
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) from nanobot.agent.subagent import SubagentStatus
status = SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic())
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status)
mgr.runner.run.assert_awaited_once() mgr.runner.run.assert_awaited_once()
mgr._announce_result.assert_awaited_once() mgr._announce_result.assert_awaited_once()
@ -344,7 +349,9 @@ class TestSubagentCancellation:
monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute) monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute)
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) from nanobot.agent.subagent import SubagentStatus
status = SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic())
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status)
mgr._announce_result.assert_awaited_once() mgr._announce_result.assert_awaited_once()
args = mgr._announce_result.await_args.args args = mgr._announce_result.await_args.args
@ -356,7 +363,7 @@ class TestSubagentCancellation:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_cancel_by_session_cancels_running_subagent_tool(self, monkeypatch, tmp_path): async def test_cancel_by_session_cancels_running_subagent_tool(self, monkeypatch, tmp_path):
from nanobot.agent.subagent import SubagentManager from nanobot.agent.subagent import SubagentManager, SubagentStatus
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse, ToolCallRequest from nanobot.providers.base import LLMResponse, ToolCallRequest
@ -389,7 +396,10 @@ class TestSubagentCancellation:
monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute) monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute)
task = asyncio.create_task( task = asyncio.create_task(
mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) mgr._run_subagent(
"sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"},
SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic()),
)
) )
mgr._running_tasks["sub-1"] = task mgr._running_tasks["sub-1"] = task
mgr._session_tasks["test:c1"] = {"sub-1"} mgr._session_tasks["test:c1"] = {"sub-1"}

View File

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
import time
from pathlib import Path from pathlib import Path
from types import SimpleNamespace from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
@ -10,7 +11,7 @@ from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from nanobot.agent.loop import AgentLoop from nanobot.agent.loop import AgentLoop
from nanobot.agent.subagent import SubagentManager from nanobot.agent.subagent import SubagentManager, SubagentStatus
from nanobot.agent.tools.search import GlobTool, GrepTool from nanobot.agent.tools.search import GlobTool, GrepTool
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
@ -319,7 +320,8 @@ async def test_subagent_registers_grep_and_glob(tmp_path: Path) -> None:
mgr.runner.run = fake_run mgr.runner.run = fake_run
mgr._announce_result = AsyncMock() mgr._announce_result = AsyncMock()
await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"}) status = SubagentStatus(task_id="sub-1", label="label", task_description="search task", started_at=time.monotonic())
await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"}, status)
assert "grep" in captured["tool_names"] assert "grep" in captured["tool_names"]
assert "glob" in captured["tool_names"] assert "glob" in captured["tool_names"]