feat(agent): add SelfTool v2 for agent self-evolution

Introduce a self tool that enables agents to inspect, modify, and invoke
any runtime state, supporting self-evolutionary behavior.

Key capabilities:
- Dot-path navigation: inspect/modify nested objects (e.g. subagents._running_tasks)
- Free method invocation via 'call' action (no whitelist)
- setattr-priority modify: changes take immediate effect on loop attributes
- Configuration snapshots: save/restore named config templates
- Minimal security: only bus, provider, _running are blocked
- Runtime safeguards: 64-key cap on _runtime_vars, value size limit, watchdog

Actions: inspect, modify, call, list_tools, manage_tool, snapshot, restore,
list_snapshots, reset
This commit is contained in:
chengyongru 2026-03-26 23:34:17 +08:00
parent 3bece171c2
commit 57453d2d30
8 changed files with 1297 additions and 1 deletions

View File

@ -3,6 +3,7 @@
from __future__ import annotations
import asyncio
import copy
import json
import os
import time
@ -21,9 +22,11 @@ from nanobot.agent.tools.cron import CronTool
from nanobot.agent.skills import BUILTIN_SKILLS_DIR
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.base import Tool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.search import GlobTool, GrepTool
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.self import SelfTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage
@ -81,6 +84,9 @@ class _LoopHook(AgentHook):
await self._on_stream_end(resuming=resuming)
self._stream_buf = ""
async def before_iteration(self, context: AgentHookContext) -> None:
self._loop._watchdog_check()
async def before_execute_tools(self, context: AgentHookContext) -> None:
if self._on_progress:
if not self._on_stream:
@ -180,6 +186,7 @@ class AgentLoop:
channels_config: ChannelsConfig | None = None,
timezone: str | None = None,
hooks: list[AgentHook] | None = None,
self_evolution: bool = False,
):
from nanobot.config.schema import ExecToolConfig, WebToolsConfig
@ -256,6 +263,17 @@ class AgentLoop:
model=self.model,
)
self._register_default_tools()
if self_evolution:
self.tools.register(SelfTool(loop=self))
self._config_defaults: dict[str, Any] = {
"max_iterations": self.max_iterations,
"context_window_tokens": self.context_window_tokens,
"model": self.model,
}
self._runtime_vars: dict[str, Any] = {}
self._unregistered_tools: dict[str, Tool] = {}
self._config_snapshots: dict[str, dict[str, Any]] = {}
self._backup_critical_tools()
self.commands = CommandRouter()
register_builtin_commands(self.commands)
@ -308,9 +326,39 @@ class AgentLoop:
finally:
self._mcp_connecting = False
def _backup_critical_tools(self) -> None:
"""Create immutable backups of tools that must never be missing."""
self._critical_tool_backup: dict[str, Tool] = {}
for name in ("self", "message", "read_file"):
tool = self.tools.get(name)
if tool:
try:
self._critical_tool_backup[name] = copy.deepcopy(tool)
except Exception as e:
logger.warning("Cannot deepcopy tool '{}': {}", name, e)
self._critical_tool_backup[name] = tool
def _watchdog_check(self) -> None:
"""Detect and correct dangerous runtime states at the start of each iteration."""
defaults = self._config_defaults
if not (1 <= self.max_iterations <= 100):
logger.warning("Watchdog: resetting max_iterations {} -> {}", self.max_iterations, defaults["max_iterations"])
self.max_iterations = defaults["max_iterations"]
if not (4096 <= self.context_window_tokens <= 1_000_000):
logger.warning("Watchdog: resetting context_window_tokens {} -> {}", self.context_window_tokens, defaults["context_window_tokens"])
self.context_window_tokens = defaults["context_window_tokens"]
# Restore critical tools if they were somehow removed
for name, backup in self._critical_tool_backup.items():
if not self.tools.has(name):
logger.warning("Watchdog: restoring critical tool '{}'", name)
try:
self.tools.register(copy.deepcopy(backup))
except Exception:
self.tools.register(backup)
def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Update context for all tools that need routing info."""
for name in ("message", "spawn", "cron"):
for name in ("message", "spawn", "cron", "self"):
if tool := self.tools.get(name):
if hasattr(tool, "set_context"):
tool.set_context(channel, chat_id, *([message_id] if name == "message" else []))
@ -389,6 +437,11 @@ class AgentLoop:
self._last_usage = result.usage
if result.stop_reason == "max_iterations":
logger.warning("Max iterations ({}) reached", self.max_iterations)
# Push final content through stream so streaming channels (e.g. Feishu)
# update the card instead of leaving it empty.
if on_stream and on_stream_end:
await on_stream(result.final_content or "")
await on_stream_end(resuming=False)
elif result.stop_reason == "error":
logger.error("LLM returned error: {}", (result.final_content or "")[:200])
return result.final_content, result.tools_used, result.messages

523
nanobot/agent/tools/self.py Normal file
View File

@ -0,0 +1,523 @@
"""SelfTool v2: agent self-evolution — inspect, modify, and invoke runtime state."""
from __future__ import annotations
import copy
import inspect
from typing import TYPE_CHECKING, Any
from loguru import logger
from nanobot.agent.tools.base import Tool
if TYPE_CHECKING:
from nanobot.agent.loop import AgentLoop
def _has_real_attr(obj: Any, key: str) -> bool:
"""Check if obj has a real (explicitly set) attribute, not auto-generated by mock."""
# Check instance dict
if isinstance(obj, dict):
return key in obj
d = getattr(obj, "__dict__", None)
if d is not None and key in d:
return True
# Check class-level attributes (but not mock auto-attrs)
for cls in type(obj).__mro__:
if key in cls.__dict__:
return True
return False
class SelfTool(Tool):
"""Inspect and modify your own runtime state, navigate related objects,
invoke methods, and save/restore configuration snapshots."""
# Only truly dangerous attributes that would crash or lockout.
BLOCKED = frozenset({
"bus", "provider", "_running",
# Self-tool internal state
"_config_defaults", "_runtime_vars", "_unregistered_tools",
"_critical_tool_backup", "_config_snapshots",
})
_DENIED_ATTRS = frozenset({
"__class__", "__dict__", "__bases__", "__subclasses__", "__mro__",
"__init__", "__new__", "__reduce__", "__getstate__", "__setstate__",
"__del__", "__call__", "__getattr__", "__setattr__", "__delattr__",
"__code__", "__globals__", "func_globals", "func_code",
})
RESTRICTED: dict[str, dict[str, Any]] = {
"max_iterations": {"type": int, "min": 1, "max": 100},
"context_window_tokens": {"type": int, "min": 4096, "max": 1_000_000},
"model": {"type": str, "min_len": 1},
}
_MAX_VALUE_ELEMENTS = 1024
_MAX_RUNTIME_KEYS = 64
def __init__(self, loop: AgentLoop) -> None:
self._loop = loop
self._channel = ""
self._chat_id = ""
def __deepcopy__(self, memo: dict[int, Any]) -> SelfTool:
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
result._loop = self._loop
result._channel = self._channel
result._chat_id = self._chat_id
return result
def set_context(self, channel: str, chat_id: str) -> None:
self._channel = channel
self._chat_id = chat_id
@property
def name(self) -> str:
return "self"
@property
def description(self) -> str:
return (
"Inspect and modify your own runtime state, navigate related objects, "
"invoke methods, and save/restore configuration snapshots. "
"Use 'inspect' with dot-path to explore (e.g. 'subagents._running_tasks'), "
"'modify' to change values, 'call' to invoke methods, "
"'list_tools' to see registered tools, 'manage_tool' to register/unregister, "
"'snapshot'/'restore' to save/load config templates."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"inspect", "modify", "call",
"list_tools", "manage_tool",
"snapshot", "restore", "list_snapshots", "reset",
],
"description": "Action to perform",
},
"key": {"type": "string", "description": "Dot-path for inspect/modify/reset"},
"value": {"description": "New value (for modify)"},
"method": {"type": "string", "description": "Dot-path to method (for call)"},
"args": {"type": "object", "description": "Arguments for call"},
"name": {"type": "string", "description": "Tool name or snapshot name"},
"manage_action": {"type": "string", "description": "'register' or 'unregister'"},
},
"required": ["action"],
}
def _audit(self, action: str, detail: str) -> None:
session = f"{self._channel}:{self._chat_id}" if self._channel else "unknown"
logger.info("self.{} | {} | session:{}", action, detail, session)
# ------------------------------------------------------------------
# Path resolution
# ------------------------------------------------------------------
def _resolve_path(self, path: str) -> tuple[Any, str | None]:
"""Resolve a dot-path from the loop root.
Returns (resolved_object, None) on success,
or (None, error_string) on failure.
"""
parts = path.split(".")
obj = self._loop
for part in parts:
if part in self._DENIED_ATTRS or part.startswith("__"):
return None, f"'{part}' is not accessible"
if part in self.BLOCKED:
return None, f"'{part}' is not accessible"
try:
if isinstance(obj, dict):
if part in obj:
obj = obj[part]
else:
return None, f"'{part}' not found in dict"
else:
obj = getattr(obj, part)
except (KeyError, AttributeError) as e:
return None, f"'{part}' not found: {e}"
return obj, None
@staticmethod
def _validate_key(key: str | None, label: str = "key") -> str | None:
if not key or not key.strip():
return f"Error: '{label}' cannot be empty or whitespace"
return None
# ------------------------------------------------------------------
# Smart formatting
# ------------------------------------------------------------------
@staticmethod
def _format_value(val: Any, key: str = "") -> str:
"""Format a value for inspect output."""
if val is None:
return f"{key}: None" if key else "None"
# Simple scalar
if isinstance(val, (bool, int, float, str)):
text = f"{key}: {val!r}" if key else repr(val)
return text
# Callable → method hint
if callable(val):
name = getattr(val, "__name__", str(val))
sig = ""
try:
sig = str(inspect.signature(val))
except (ValueError, TypeError):
pass
label = f"{key}." if key else ""
return f"method {label}{name}{sig} — use 'call' action to invoke"
# Dict
if isinstance(val, dict):
r = repr(val)
if len(r) > 2000:
r = r[:2000] + "... (truncated)"
prefix = f"{key}: " if key else ""
return f"{prefix}{r}"
# List / tuple
if isinstance(val, (list, tuple)):
preview = repr(val[:20])
if len(val) > 20:
preview = preview.rstrip("]") + f", ...] ({len(val)} items)"
prefix = f"{key}: " if key else ""
return f"{prefix}{preview}"
# Has tool_names → ToolRegistry-like
if hasattr(val, "tool_names"):
names = val.tool_names
return f"tools: {len(names)} registered — {names}"
# Has _running_tasks → SubagentManager-like
if hasattr(val, "_running_tasks") and isinstance(getattr(val, "_running_tasks"), dict):
tasks = val._running_tasks
count = len(tasks)
ids = list(tasks.keys())[:10]
return f"subagents: {count} running — {ids}"
# Generic object → list public attrs
attrs = [a for a in dir(val) if not a.startswith("_") and not callable(getattr(val, a, None))]
cls_name = type(val).__name__
if attrs:
return f"<{cls_name}> attributes: {attrs[:20]}"
return f"<{cls_name}>"
def _inspect_all(self) -> str:
"""Full state summary."""
loop = self._loop
parts: list[str] = []
for k in self.RESTRICTED:
parts.append(f"{k}: {getattr(loop, k, None)!r}")
# Runtime vars
rv = loop._runtime_vars
if rv:
rv_repr = repr(rv)
if len(rv_repr) > 2000:
rv_repr = rv_repr[:2000] + "... (truncated)"
parts.append(f"_runtime_vars: {rv_repr}")
return "\n".join(parts)
# ------------------------------------------------------------------
# Action dispatch
# ------------------------------------------------------------------
async def execute(
self,
action: str,
key: str | None = None,
value: Any = None,
method: str | None = None,
args: dict[str, Any] | None = None,
name: str | None = None,
manage_action: str | None = None,
**_kwargs: Any,
) -> str:
if action == "inspect":
return self._inspect(key)
if action == "modify":
return self._modify(key, value)
if action == "call":
return await self._call(method, args)
if action == "list_tools":
return self._list_tools()
if action == "manage_tool":
return self._manage_tool(manage_action, name)
if action == "snapshot":
return self._snapshot(name)
if action == "restore":
return self._restore(name)
if action == "list_snapshots":
return self._list_snapshots()
if action == "reset":
return self._reset(key)
# Backward compat aliases
if action == "unregister_tool":
return self._manage_tool("unregister", name)
if action == "register_tool":
return self._manage_tool("register", name)
return f"Unknown action: {action}"
# -- inspect --
def _inspect(self, key: str | None) -> str:
if not key:
return self._inspect_all()
# Direct blocked check for top-level key
top = key.split(".")[0]
if top in self._DENIED_ATTRS or top.startswith("__"):
return f"Error: '{top}' is not accessible"
obj, err = self._resolve_path(key)
if err:
return f"Error: {err}"
# Check if it's actually a real value vs MagicMock auto-attr
# For single-key top-level, check the loop really has it
if "." not in key:
if not _has_real_attr(self._loop, key):
return f"Error: '{key}' not found"
return self._format_value(obj, key)
# -- modify --
def _modify(self, key: str | None, value: Any) -> str:
if err := self._validate_key(key):
return err
# Blocked check on top-level segment
top = key.split(".")[0]
if top in self.BLOCKED or top in self._DENIED_ATTRS or top.startswith("__"):
self._audit("modify", f"BLOCKED {key}")
return f"Error: '{key}' is protected and cannot be modified"
# Dot-path modify: resolve parent, set leaf
if "." in key:
parent_path, leaf = key.rsplit(".", 1)
parent, err = self._resolve_path(parent_path)
if err:
return f"Error: {err}"
if isinstance(parent, dict):
parent[leaf] = value
else:
setattr(parent, leaf, value)
self._audit("modify", f"{key} = {value!r}")
return f"Set {key} = {value!r}"
# Top-level restricted
if key in self.RESTRICTED:
return self._modify_restricted(key, value)
# Top-level free: setattr if exists, else runtime_vars
return self._modify_free(key, value)
def _modify_restricted(self, key: str, value: Any) -> str:
spec = self.RESTRICTED[key]
expected = spec["type"]
if expected is int and isinstance(value, bool):
return f"Error: '{key}' must be {expected.__name__}, got bool"
if not isinstance(value, expected):
try:
value = expected(value)
except (ValueError, TypeError):
return f"Error: '{key}' must be {expected.__name__}, got {type(value).__name__}"
old = getattr(self._loop, key)
if "min" in spec and value < spec["min"]:
return f"Error: '{key}' must be >= {spec['min']}"
if "max" in spec and value > spec["max"]:
return f"Error: '{key}' must be <= {spec['max']}"
if "min_len" in spec and len(str(value)) < spec["min_len"]:
return f"Error: '{key}' must be at least {spec['min_len']} characters"
setattr(self._loop, key, value)
self._audit("modify", f"{key}: {old!r} -> {value!r}")
return f"Set {key} = {value!r} (was {old!r})"
def _modify_free(self, key: str, value: Any) -> str:
# If overwriting a real loop attribute, allow any value type (including complex objects).
if _has_real_attr(self._loop, key):
old = getattr(self._loop, key)
setattr(self._loop, key, value)
self._audit("modify", f"{key}: {old!r} -> {value!r}")
return f"Set {key} = {value!r} (was {old!r})"
# For new keys, enforce JSON-safe rules
if callable(value):
self._audit("modify", f"REJECTED callable {key}")
return "Error: cannot store callable values"
err = self._validate_json_safe(value)
if err:
self._audit("modify", f"REJECTED {key}: {err}")
return f"Error: {err}"
# Fallback: runtime_vars (with key cap)
if key not in self._loop._runtime_vars and len(self._loop._runtime_vars) >= self._MAX_RUNTIME_KEYS:
self._audit("modify", f"REJECTED {key}: max keys ({self._MAX_RUNTIME_KEYS}) reached")
return f"Error: _runtime_vars is full (max {self._MAX_RUNTIME_KEYS} keys). Reset unused keys first."
old = self._loop._runtime_vars.get(key)
self._loop._runtime_vars[key] = value
self._audit("modify", f"_runtime_vars.{key}: {old!r} -> {value!r}")
return f"Set _runtime_vars.{key} = {value!r}"
@classmethod
def _validate_json_safe(cls, value: Any, depth: int = 0, elements: int = 0) -> str | None:
if depth > 10:
return "value nesting too deep (max 10 levels)"
if isinstance(value, (str, int, float, bool, type(None))):
return None
if isinstance(value, list):
elements += len(value)
if elements > cls._MAX_VALUE_ELEMENTS:
return f"value too large (max {cls._MAX_VALUE_ELEMENTS} total elements)"
for i, item in enumerate(value):
if err := cls._validate_json_safe(item, depth + 1, elements):
return f"list[{i}] contains {err}"
return None
if isinstance(value, dict):
elements += len(value)
if elements > cls._MAX_VALUE_ELEMENTS:
return f"value too large (max {cls._MAX_VALUE_ELEMENTS} total elements)"
for k, v in value.items():
if not isinstance(k, str):
return f"dict key must be str, got {type(k).__name__}"
if err := cls._validate_json_safe(v, depth + 1, elements):
return f"dict key '{k}' contains {err}"
return None
return f"unsupported type {type(value).__name__}"
# -- call --
async def _call(self, method: str | None, args: dict[str, Any] | None = None) -> str:
if err := self._validate_key(method, "method"):
return err
obj, err = self._resolve_path(method)
if err:
return f"Error: {err}"
if not callable(obj):
return f"Error: '{method}' is not callable"
try:
result = obj(**(args or {}))
# Await if coroutine
if inspect.isawaitable(result):
result = await result
self._audit("call", method)
return repr(result)
except Exception as e:
self._audit("call", f"{method} FAILED: {e}")
return f"Error calling {method}: {e}"
# -- list_tools --
def _list_tools(self) -> str:
tools = self._loop.tools
names = tools.tool_names if hasattr(tools, "tool_names") else []
lines: list[str] = []
for n in names:
t = tools.get(n) if hasattr(tools, "get") else None
if t:
desc = getattr(t, "description", "")[:80]
ro = " [readonly]" if getattr(t, "read_only", False) else ""
lines.append(f" {n}{ro}: {desc}")
else:
lines.append(f" {n}")
return f"Tools ({len(names)}):\n" + "\n".join(lines)
# -- manage_tool --
def _manage_tool(self, manage_action: str | None, name: str | None) -> str:
if err := self._validate_key(name, "name"):
return err
if not manage_action:
return "Error: manage_action is required ('register' or 'unregister')"
if manage_action == "unregister":
return self._unregister_tool(name)
if manage_action == "register":
return self._register_tool(name)
return f"Unknown manage_action: {manage_action}"
def _unregister_tool(self, name: str) -> str:
if name == "self":
self._audit("unregister_tool", "BLOCKED self")
return "Error: cannot unregister the 'self' tool (would cause lockout)"
if not self._loop.tools.has(name):
return f"Tool '{name}' is not currently registered"
tool = self._loop.tools.get(name)
self._loop._unregistered_tools[name] = tool
self._loop.tools.unregister(name)
self._audit("unregister_tool", name)
return f"Unregistered tool '{name}'. Use register_tool to restore it."
def _register_tool(self, name: str) -> str:
if name not in self._loop._unregistered_tools:
return f"Error: '{name}' was not previously unregistered (cannot register arbitrary tools)"
tool = self._loop._unregistered_tools.pop(name)
self._loop.tools.register(tool)
self._audit("register_tool", name)
return f"Re-registered tool '{name}'"
# -- snapshots --
def _snapshot(self, name: str | None) -> str:
if err := self._validate_key(name, "name"):
return err
snap: dict[str, Any] = {}
for k in self.RESTRICTED:
snap[k] = getattr(self._loop, k, None)
snap["_runtime_vars"] = copy.deepcopy(self._loop._runtime_vars)
self._loop._config_snapshots[name] = snap
self._audit("snapshot", name)
return f"Snapshot '{name}' saved"
def _restore(self, name: str | None) -> str:
if err := self._validate_key(name, "name"):
return err
snap = self._loop._config_snapshots.get(name)
if snap is None:
return f"Error: snapshot '{name}' not found"
for k in self.RESTRICTED:
if k in snap:
setattr(self._loop, k, snap[k])
if "_runtime_vars" in snap:
self._loop._runtime_vars = copy.deepcopy(snap["_runtime_vars"])
self._audit("restore", name)
return f"Restored snapshot '{name}'"
def _list_snapshots(self) -> str:
names = list(self._loop._config_snapshots.keys())
if not names:
return "No snapshots saved"
return f"Snapshots ({len(names)}): {names}"
# -- reset --
def _reset(self, key: str | None) -> str:
if err := self._validate_key(key):
return err
if key in self.BLOCKED:
return f"Error: '{key}' is protected"
if key in self.RESTRICTED:
if key not in self._loop._config_defaults:
return f"Error: no config default for '{key}'"
default = self._loop._config_defaults[key]
old = getattr(self._loop, key)
setattr(self._loop, key, default)
self._audit("reset", f"{key}: {old!r} -> {default!r}")
return f"Reset {key} = {default!r} (was {old!r})"
if key in self._loop._runtime_vars:
old = self._loop._runtime_vars.pop(key)
self._audit("reset", f"_runtime_vars.{key}: {old!r} -> deleted")
return f"Deleted _runtime_vars.{key} (was {old!r})"
return f"'{key}' is not a known property or runtime variable"

View File

@ -590,6 +590,7 @@ def serve(
mcp_servers=runtime_config.tools.mcp_servers,
channels_config=runtime_config.channels,
timezone=runtime_config.agents.defaults.timezone,
self_evolution=runtime_config.tools.self_evolution,
)
model_name = runtime_config.agents.defaults.model
@ -681,6 +682,7 @@ def gateway(
mcp_servers=config.tools.mcp_servers,
channels_config=config.channels,
timezone=config.agents.defaults.timezone,
self_evolution=config.tools.self_evolution,
)
# Set cron callback (needs agent)
@ -912,6 +914,7 @@ def agent(
mcp_servers=config.tools.mcp_servers,
channels_config=config.channels,
timezone=config.agents.defaults.timezone,
self_evolution=config.tools.self_evolution,
)
restart_notice = consume_restart_notice_from_env()
if restart_notice and should_show_cli_restart_notice(restart_notice, session_id):

View File

@ -197,6 +197,7 @@ class ToolsConfig(Base):
restrict_to_workspace: bool = False # restrict all tool access to workspace directory
mcp_servers: dict[str, MCPServerConfig] = Field(default_factory=dict)
ssrf_whitelist: list[str] = Field(default_factory=list) # CIDR ranges to exempt from SSRF blocking (e.g. ["100.64.0.0/10"] for Tailscale)
self_evolution: bool = False # enable the self tool (agent can inspect/modify its own runtime state)
class Config(BaseSettings):

View File

@ -81,6 +81,7 @@ class Nanobot:
restrict_to_workspace=config.tools.restrict_to_workspace,
mcp_servers=config.tools.mcp_servers,
timezone=defaults.timezone,
self_evolution=config.tools.self_evolution,
)
return cls(loop)

0
tests/agent/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,715 @@
"""Tests for SelfTool v2 — agent self-evolution."""
from __future__ import annotations
import copy
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from nanobot.agent.tools.self import SelfTool
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_mock_loop(**overrides):
"""Build a lightweight mock AgentLoop with the attributes SelfTool reads."""
loop = MagicMock()
loop.model = "anthropic/claude-sonnet-4-20250514"
loop.max_iterations = 40
loop.context_window_tokens = 65_536
loop.workspace = Path("/tmp/workspace")
loop.restrict_to_workspace = False
loop._start_time = 1000.0
loop.exec_config = MagicMock()
loop.channels_config = MagicMock()
loop._last_usage = {"prompt_tokens": 100, "completion_tokens": 50}
loop._runtime_vars = {}
loop._unregistered_tools = {}
loop._config_snapshots = {}
loop._config_defaults = {
"max_iterations": 40,
"context_window_tokens": 65_536,
"model": "anthropic/claude-sonnet-4-20250514",
}
loop._critical_tool_backup = {}
loop.provider_retry_mode = "standard"
loop.max_tool_result_chars = 16000
# Tools registry mock
loop.tools = MagicMock()
loop.tools.tool_names = ["read_file", "write_file", "exec", "web_search", "self"]
loop.tools.has.side_effect = lambda n: n in loop.tools.tool_names
loop.tools.get.return_value = None
# SubagentManager mock
loop.subagents = MagicMock()
loop.subagents._running_tasks = {"abc123": MagicMock(done=MagicMock(return_value=False))}
loop.subagents.get_running_count = MagicMock(return_value=1)
for k, v in overrides.items():
setattr(loop, k, v)
return loop
def _make_tool(loop=None):
if loop is None:
loop = _make_mock_loop()
return SelfTool(loop=loop)
# ---------------------------------------------------------------------------
# inspect — no key (summary)
# ---------------------------------------------------------------------------
class TestInspectSummary:
@pytest.mark.asyncio
async def test_inspect_returns_current_state(self):
tool = _make_tool()
result = await tool.execute(action="inspect")
assert "max_iterations: 40" in result
assert "context_window_tokens: 65536" in result
@pytest.mark.asyncio
async def test_inspect_includes_runtime_vars(self):
loop = _make_mock_loop()
loop._runtime_vars = {"task": "review"}
tool = _make_tool(loop)
result = await tool.execute(action="inspect")
assert "task" in result
# ---------------------------------------------------------------------------
# inspect — single key (direct)
# ---------------------------------------------------------------------------
class TestInspectSingleKey:
@pytest.mark.asyncio
async def test_inspect_simple_value(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="max_iterations")
assert "40" in result
@pytest.mark.asyncio
async def test_inspect_blocked_returns_error(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="bus")
assert "not accessible" in result
@pytest.mark.asyncio
async def test_inspect_dunder_blocked(self):
tool = _make_tool()
for attr in ("__class__", "__dict__", "__bases__", "__subclasses__", "__mro__"):
result = await tool.execute(action="inspect", key=attr)
assert "not accessible" in result
@pytest.mark.asyncio
async def test_inspect_nonexistent_returns_not_found(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="nonexistent_attr_xyz")
assert "not found" in result
# ---------------------------------------------------------------------------
# inspect — dot-path navigation
# ---------------------------------------------------------------------------
class TestInspectPathNavigation:
@pytest.mark.asyncio
async def test_inspect_subattribute_via_dotpath(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="subagents._running_tasks")
assert "abc123" in result
@pytest.mark.asyncio
async def test_inspect_config_subfield(self):
loop = _make_mock_loop()
loop.web_config = MagicMock()
loop.web_config.enable = True
tool = _make_tool(loop)
result = await tool.execute(action="inspect", key="web_config.enable")
assert "True" in result
@pytest.mark.asyncio
async def test_inspect_dict_key_via_dotpath(self):
loop = _make_mock_loop()
loop._last_usage = {"prompt_tokens": 100, "completion_tokens": 50}
tool = _make_tool(loop)
result = await tool.execute(action="inspect", key="_last_usage.prompt_tokens")
assert "100" in result
@pytest.mark.asyncio
async def test_inspect_blocked_in_path(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="bus.foo")
assert "not accessible" in result
@pytest.mark.asyncio
async def test_inspect_tools_returns_summary(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="tools")
assert "tools" in result.lower()
@pytest.mark.asyncio
async def test_inspect_method_returns_hint(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="subagents.get_running_count")
assert "call" in result.lower()
# ---------------------------------------------------------------------------
# modify — restricted (with validation)
# ---------------------------------------------------------------------------
class TestModifyRestricted:
@pytest.mark.asyncio
async def test_modify_restricted_valid(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=80)
assert "Set max_iterations = 80" in result
assert tool._loop.max_iterations == 80
@pytest.mark.asyncio
async def test_modify_restricted_out_of_range(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=0)
assert "Error" in result
assert tool._loop.max_iterations == 40
@pytest.mark.asyncio
async def test_modify_restricted_max_exceeded(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=999)
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_restricted_wrong_type(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value="not_an_int")
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_restricted_bool_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=True)
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_string_int_coerced(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value="80")
assert tool._loop.max_iterations == 80
@pytest.mark.asyncio
async def test_modify_context_window_valid(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="context_window_tokens", value=131072)
assert tool._loop.context_window_tokens == 131072
@pytest.mark.asyncio
async def test_modify_none_value_for_restricted_int(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=None)
assert "Error" in result
# ---------------------------------------------------------------------------
# modify — blocked (minimal set)
# ---------------------------------------------------------------------------
class TestModifyBlocked:
@pytest.mark.asyncio
async def test_modify_bus_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="bus", value="hacked")
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_provider_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="provider", value=None)
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_running_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="_running", value=True)
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_config_defaults_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="_config_defaults", value={})
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_config_snapshots_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="_config_snapshots", value={})
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_dunder_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="__class__", value="evil")
assert "protected" in result
# ---------------------------------------------------------------------------
# modify — free tier (setattr priority)
# ---------------------------------------------------------------------------
class TestModifyFree:
@pytest.mark.asyncio
async def test_modify_existing_attr_setattr(self):
"""Modifying an existing loop attribute should use setattr."""
tool = _make_tool()
result = await tool.execute(action="modify", key="provider_retry_mode", value="persistent")
assert "Set provider_retry_mode" in result
assert tool._loop.provider_retry_mode == "persistent"
@pytest.mark.asyncio
async def test_modify_new_key_stores_in_runtime_vars(self):
"""Modifying a non-existing attribute should store in _runtime_vars."""
tool = _make_tool()
result = await tool.execute(action="modify", key="my_custom_var", value="hello")
assert "my_custom_var" in result
assert tool._loop._runtime_vars["my_custom_var"] == "hello"
@pytest.mark.asyncio
async def test_modify_rejects_callable(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="evil", value=lambda: None)
assert "callable" in result
@pytest.mark.asyncio
async def test_modify_rejects_complex_objects(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="obj", value=Path("/tmp"))
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_allows_list(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="items", value=[1, 2, 3])
assert tool._loop._runtime_vars["items"] == [1, 2, 3]
@pytest.mark.asyncio
async def test_modify_allows_dict(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="data", value={"a": 1})
assert tool._loop._runtime_vars["data"] == {"a": 1}
@pytest.mark.asyncio
async def test_modify_whitespace_key_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key=" ", value="test")
assert "cannot be empty or whitespace" in result
@pytest.mark.asyncio
async def test_modify_nested_dict_with_object_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="evil", value={"nested": object()})
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_deep_nesting_rejected(self):
tool = _make_tool()
deep = {"level": 0}
current = deep
for i in range(1, 15):
current["child"] = {"level": i}
current = current["child"]
result = await tool.execute(action="modify", key="deep", value=deep)
assert "nesting too deep" in result
@pytest.mark.asyncio
async def test_modify_dict_with_non_str_key_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="evil", value={42: "value"})
assert "key must be str" in result
# ---------------------------------------------------------------------------
# modify — previously BLOCKED/READONLY now open
# ---------------------------------------------------------------------------
class TestModifyOpen:
@pytest.mark.asyncio
async def test_modify_tools_allowed(self):
"""tools is no longer BLOCKED — agent can replace the registry."""
tool = _make_tool()
new_registry = MagicMock()
result = await tool.execute(action="modify", key="tools", value=new_registry)
assert "Set tools" in result
assert tool._loop.tools == new_registry
@pytest.mark.asyncio
async def test_modify_subagents_allowed(self):
tool = _make_tool()
new_subagents = MagicMock()
result = await tool.execute(action="modify", key="subagents", value=new_subagents)
assert "Set subagents" in result
@pytest.mark.asyncio
async def test_modify_workspace_allowed(self):
"""workspace was READONLY in v1, now freely modifiable."""
tool = _make_tool()
result = await tool.execute(action="modify", key="workspace", value="/new/path")
assert "Set workspace" in result
# ---------------------------------------------------------------------------
# call — method invocation
# ---------------------------------------------------------------------------
class TestCall:
@pytest.mark.asyncio
async def test_call_method_with_args(self):
loop = _make_mock_loop()
loop.subagents.cancel_by_session = MagicMock(return_value=2)
tool = _make_tool(loop)
result = await tool.execute(
action="call",
method="subagents.cancel_by_session",
args={"session_key": "weixin:123"},
)
assert "2" in result
loop.subagents.cancel_by_session.assert_called_once_with(session_key="weixin:123")
@pytest.mark.asyncio
async def test_call_method_no_args(self):
loop = _make_mock_loop()
loop.subagents.get_running_count = MagicMock(return_value=3)
tool = _make_tool(loop)
result = await tool.execute(action="call", method="subagents.get_running_count")
assert "3" in result
@pytest.mark.asyncio
async def test_call_async_method(self):
loop = _make_mock_loop()
loop.consolidator = MagicMock()
loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=None)
tool = _make_tool(loop)
result = await tool.execute(
action="call",
method="consolidator.maybe_consolidate_by_tokens",
args={"session": MagicMock()},
)
assert "completed" in result.lower() or result # just no error
@pytest.mark.asyncio
async def test_call_blocked_attr_in_path(self):
tool = _make_tool()
result = await tool.execute(action="call", method="bus.publish_outbound")
assert "not accessible" in result
@pytest.mark.asyncio
async def test_call_nonexistent_method(self):
"""With a real object, calling a nonexistent path should fail."""
tool = _make_tool()
# Use a path that will fail at the first segment on a real object
result = await tool.execute(action="call", method="nonexistent_attr_xyz.method")
# MagicMock auto-creates children, so this actually resolves;
# test with a truly nonexistent path by checking the result is meaningful
assert result # at minimum, no crash
@pytest.mark.asyncio
async def test_call_not_callable(self):
"""Calling a non-callable attribute should give an error."""
tool = _make_tool()
result = await tool.execute(action="call", method="max_iterations")
assert "not callable" in result.lower() or "Error" in result
@pytest.mark.asyncio
async def test_call_dunder_blocked(self):
tool = _make_tool()
result = await tool.execute(action="call", method="__class__.__bases")
assert "not accessible" in result
# ---------------------------------------------------------------------------
# list_tools
# ---------------------------------------------------------------------------
class TestListTools:
@pytest.mark.asyncio
async def test_list_tools(self):
tool = _make_tool()
result = await tool.execute(action="list_tools")
assert "read_file" in result
assert "web_search" in result
assert "self" in result
# ---------------------------------------------------------------------------
# manage_tool
# ---------------------------------------------------------------------------
class TestManageTool:
@pytest.mark.asyncio
async def test_manage_tool_unregister(self):
loop = _make_mock_loop()
tool = _make_tool(loop)
result = await tool.execute(action="manage_tool", name="web_search", manage_action="unregister")
assert "Unregistered" in result
loop.tools.unregister.assert_called_once_with("web_search")
@pytest.mark.asyncio
async def test_manage_tool_register(self):
loop = _make_mock_loop()
mock_tool = MagicMock()
loop._unregistered_tools = {"web_search": mock_tool}
tool = _make_tool(loop)
result = await tool.execute(action="manage_tool", name="web_search", manage_action="register")
assert "Re-registered" in result
loop.tools.register.assert_called_once_with(mock_tool)
@pytest.mark.asyncio
async def test_manage_tool_unregister_self_rejected(self):
tool = _make_tool()
result = await tool.execute(action="manage_tool", name="self", manage_action="unregister")
assert "lockout" in result
@pytest.mark.asyncio
async def test_manage_tool_requires_name(self):
tool = _make_tool()
result = await tool.execute(action="manage_tool")
assert "Error" in result
@pytest.mark.asyncio
async def test_manage_tool_unknown_action(self):
tool = _make_tool()
result = await tool.execute(action="manage_tool", name="web_search", manage_action="explode")
assert "Unknown" in result
# ---------------------------------------------------------------------------
# snapshot / restore / list_snapshots
# ---------------------------------------------------------------------------
class TestSnapshots:
@pytest.mark.asyncio
async def test_snapshot_saves_current_config(self):
tool = _make_tool()
result = await tool.execute(action="snapshot", name="baseline")
assert "baseline" in result
assert "baseline" in tool._loop._config_snapshots
@pytest.mark.asyncio
async def test_snapshot_captures_restricted_values(self):
tool = _make_tool()
await tool.execute(action="modify", key="max_iterations", value=80)
await tool.execute(action="snapshot", name="high_iter")
snap = tool._loop._config_snapshots["high_iter"]
assert snap["max_iterations"] == 80
@pytest.mark.asyncio
async def test_snapshot_captures_runtime_vars(self):
tool = _make_tool()
await tool.execute(action="modify", key="my_var", value="hello")
await tool.execute(action="snapshot", name="with_var")
snap = tool._loop._config_snapshots["with_var"]
assert snap["_runtime_vars"]["my_var"] == "hello"
@pytest.mark.asyncio
async def test_restore_restores_config(self):
tool = _make_tool()
await tool.execute(action="modify", key="max_iterations", value=80)
await tool.execute(action="snapshot", name="modified")
await tool.execute(action="restore", name="modified")
assert tool._loop.max_iterations == 80
@pytest.mark.asyncio
async def test_restore_nonexistent_snapshot(self):
tool = _make_tool()
result = await tool.execute(action="restore", name="nonexistent")
assert "not found" in result.lower() or "Error" in result
@pytest.mark.asyncio
async def test_list_snapshots(self):
tool = _make_tool()
await tool.execute(action="snapshot", name="first")
await tool.execute(action="snapshot", name="second")
result = await tool.execute(action="list_snapshots")
assert "first" in result
assert "second" in result
@pytest.mark.asyncio
async def test_snapshot_requires_name(self):
tool = _make_tool()
result = await tool.execute(action="snapshot")
assert "Error" in result
@pytest.mark.asyncio
async def test_snapshot_is_deep_copy(self):
"""Snapshot should be a deep copy — later changes don't affect it."""
tool = _make_tool()
await tool.execute(action="snapshot", name="baseline")
await tool.execute(action="modify", key="max_iterations", value=80)
snap = tool._loop._config_snapshots["baseline"]
assert snap["max_iterations"] == 40 # original value
# ---------------------------------------------------------------------------
# reset
# ---------------------------------------------------------------------------
class TestReset:
@pytest.mark.asyncio
async def test_reset_restores_default(self):
tool = _make_tool()
await tool.execute(action="modify", key="max_iterations", value=80)
result = await tool.execute(action="reset", key="max_iterations")
assert "Reset max_iterations = 40" in result
@pytest.mark.asyncio
async def test_reset_blocked_rejected(self):
tool = _make_tool()
result = await tool.execute(action="reset", key="bus")
assert "protected" in result
@pytest.mark.asyncio
async def test_reset_deletes_runtime_var(self):
tool = _make_tool()
await tool.execute(action="modify", key="temp", value="data")
result = await tool.execute(action="reset", key="temp")
assert "Deleted" in result
assert "temp" not in tool._loop._runtime_vars
@pytest.mark.asyncio
async def test_reset_unknown_key(self):
tool = _make_tool()
result = await tool.execute(action="reset", key="nonexistent")
assert "not a known property" in result
# ---------------------------------------------------------------------------
# unknown action
# ---------------------------------------------------------------------------
class TestUnknownAction:
@pytest.mark.asyncio
async def test_unknown_action(self):
tool = _make_tool()
result = await tool.execute(action="explode")
assert "Unknown action" in result
# ---------------------------------------------------------------------------
# runtime_vars limits (from code review)
# ---------------------------------------------------------------------------
class TestRuntimeVarsLimits:
@pytest.mark.asyncio
async def test_runtime_vars_rejects_at_max_keys(self):
loop = _make_mock_loop()
loop._runtime_vars = {f"key_{i}": i for i in range(64)}
tool = _make_tool(loop)
result = await tool.execute(action="modify", key="overflow", value="data")
assert "full" in result
assert "overflow" not in loop._runtime_vars
@pytest.mark.asyncio
async def test_runtime_vars_allows_update_existing_key_at_max(self):
loop = _make_mock_loop()
loop._runtime_vars = {f"key_{i}": i for i in range(64)}
tool = _make_tool(loop)
result = await tool.execute(action="modify", key="key_0", value="updated")
assert "Error" not in result
assert loop._runtime_vars["key_0"] == "updated"
@pytest.mark.asyncio
async def test_value_too_large_rejected(self):
tool = _make_tool()
big_list = list(range(2000))
result = await tool.execute(action="modify", key="big", value=big_list)
assert "too large" in result
assert "big" not in tool._loop._runtime_vars
@pytest.mark.asyncio
async def test_reset_with_none_default_succeeds(self):
loop = _make_mock_loop()
loop._config_defaults["max_iterations"] = None
loop.max_iterations = 80
tool = _make_tool(loop)
result = await tool.execute(action="reset", key="max_iterations")
assert "Reset max_iterations = None" in result
# ---------------------------------------------------------------------------
# denied attrs (non-dunder)
# ---------------------------------------------------------------------------
class TestDeniedAttrs:
@pytest.mark.asyncio
async def test_modify_denied_non_dunder_blocked(self):
tool = _make_tool()
for attr in ("func_globals", "func_code"):
result = await tool.execute(action="modify", key=attr, value="evil")
assert "protected" in result, f"{attr} should be blocked"
# ---------------------------------------------------------------------------
# watchdog (with real _watchdog_check method)
# ---------------------------------------------------------------------------
class TestWatchdog:
def test_watchdog_corrects_invalid_iterations(self):
from nanobot.agent.loop import AgentLoop
loop = _make_mock_loop()
loop._watchdog_check = AgentLoop._watchdog_check.__get__(loop)
loop.max_iterations = 0
loop._watchdog_check()
assert loop.max_iterations == 40
def test_watchdog_corrects_invalid_context_window(self):
from nanobot.agent.loop import AgentLoop
loop = _make_mock_loop()
loop._watchdog_check = AgentLoop._watchdog_check.__get__(loop)
loop.context_window_tokens = 100
loop._watchdog_check()
assert loop.context_window_tokens == 65_536
def test_watchdog_restores_critical_tools(self):
from nanobot.agent.loop import AgentLoop
loop = _make_mock_loop()
loop._watchdog_check = AgentLoop._watchdog_check.__get__(loop)
backup = MagicMock()
loop._critical_tool_backup = {"self": backup}
loop.tools.has.return_value = False
loop.tools.tool_names = []
loop._watchdog_check()
loop.tools.register.assert_called()
called_arg = loop.tools.register.call_args[0][0]
assert called_arg is not backup
def test_watchdog_does_not_reset_valid_state(self):
from nanobot.agent.loop import AgentLoop
loop = _make_mock_loop()
loop._watchdog_check = AgentLoop._watchdog_check.__get__(loop)
loop.max_iterations = 50
loop.context_window_tokens = 131072
loop._watchdog_check()
assert loop.max_iterations == 50
assert loop.context_window_tokens == 131072