diff --git a/docs/MY_TOOL.md b/docs/MY_TOOL.md new file mode 100644 index 000000000..caac563e9 --- /dev/null +++ b/docs/MY_TOOL.md @@ -0,0 +1,203 @@ +# My Tool + +Let the agent sense and adjust its own runtime state — like asking a coworker "are you busy? can you switch to a bigger monitor?" + +## Why You Need It + +Normal tools let the agent operate on the outside world (read/write files, search code). But the agent knows nothing about itself — it doesn't know which model it's running on, how many iterations are left, or how many tokens it has consumed. + +My tool fills this gap. With it, the agent can: + +- **Know who it is**: What model am I using? Where is my workspace? How many iterations remain? +- **Adapt on the fly**: Complex task? Expand the context window. Simple chat? Switch to a faster model. +- **Remember across turns**: Store notes in your scratchpad that persist into the next conversation turn. + +## Configuration + +Enabled by default (read-only mode). The agent can check its state but not set it. + +```yaml +tools: + my_enabled: true # default: true + my_set: false # default: false (read-only) +``` + +To allow the agent to set its configuration (e.g. switch models, adjust parameters), set `my_set: true`. + +All modifications are held in memory only — restart restores defaults. + +--- + +## check — Check "my" current state + +Without parameters, returns a key config overview: + +``` +my(action="check") +# → max_iterations: 40 +# context_window_tokens: 65536 +# model: 'anthropic/claude-sonnet-4-20250514' +# workspace: PosixPath('/tmp/workspace') +# provider_retry_mode: 'standard' +# max_tool_result_chars: 16000 +# _current_iteration: 3 +# _last_usage: {'prompt_tokens': 45000, 'completion_tokens': 8000} +# Note: prompt_tokens is cumulative across all turns, not current context window occupancy. +``` + +With a key parameter, drill into a specific config: + +``` +my(action="check", key="_last_usage.prompt_tokens") +# → How many prompt tokens I've used so far + +my(action="check", key="model") +# → What model I'm currently running on + +my(action="check", key="web_config.enable") +# → Whether web search is enabled +``` + +### What you can do with it + +| Scenario | How | +|----------|-----| +| "What model are you using?" | `check("model")` | +| "How many more tool calls can you make?" | `check("max_iterations")` minus `check("_current_iteration")` | +| "How many tokens has this conversation used?" | `check("_last_usage")` — cumulative across all turns | +| "Where is your working directory?" | `check("workspace")` | +| "Show me your full config" | `check()` | +| "Are there any subagents running?" | `check("subagents")` — shows phase, iteration, elapsed time, tool events | + +--- + +## set — Runtime tuning + +Changes take effect immediately, no restart required. + +``` +my(action="set", key="max_iterations", value=80) +# → Bump iteration limit from 40 to 80 + +my(action="set", key="model", value="fast-model") +# → Switch to a faster model + +my(action="set", key="context_window_tokens", value=131072) +# → Expand context window for long documents +``` + +You can also store custom state in your scratchpad: + +``` +my(action="set", key="current_project", value="nanobot") +my(action="set", key="user_style_preference", value="concise") +my(action="set", key="task_complexity", value="high") +# → These values persist into the next conversation turn +``` + +### Protected parameters + +These parameters have type and range validation — invalid values are rejected: + +| Parameter | Type | Range | Purpose | +|-----------|------|-------|---------| +| `max_iterations` | int | 1–100 | Max tool calls per conversation turn | +| `context_window_tokens` | int | 4,096–1,000,000 | Context window size | +| `model` | str | non-empty | LLM model to use | + +Other parameters (e.g. `workspace`, `provider_retry_mode`, `max_tool_result_chars`) can be set freely, as long as the value is JSON-safe. + +--- + +## Practical Scenarios + +### "This task is complex, I need more room" + +``` +Agent: This codebase is large, let me expand my context window to handle it. +→ my(action="set", key="context_window_tokens", value=131072) +``` + +### "Simple question, don't waste compute" + +``` +Agent: This is a straightforward question, let me switch to a faster model. +→ my(action="set", key="model", value="fast-model") +``` + +### "Remember user preferences across turns" + +``` +Turn 1: my(action="set", key="user_prefers_concise", value=True) +Turn 2: my(action="check", key="user_prefers_concise") +# → True (still remembers the user likes concise replies) +``` + +### "Self-diagnosis" + +``` +User: "Why aren't you searching the web?" +Agent: Let me check my web config. +→ my(action="check", key="web_config.enable") +# → False +Agent: Web search is disabled — please set web.enable: true in your config. +``` + +### "Token budget management" + +``` +Agent: Let me check how much budget I have left. +→ my(action="check", key="_last_usage") +# → {"prompt_tokens": 45000, "completion_tokens": 8000} +Agent: I've used ~53k tokens total so far. I'll keep my remaining replies concise. +``` + +### "Subagent monitoring" + +``` +Agent: Let me check on the background tasks. +→ my(action="check", key="subagents") +# → 2 subagent(s): +# [task-1] 'Code review' +# phase: running, iteration: 5, elapsed: 12.3s +# tools: read(✓), grep(✓) +# usage: {'prompt_tokens': 8000, 'completion_tokens': 1200} +# [task-2] 'Write tests' +# phase: pending, iteration: 0, elapsed: 0.2s +# tools: none +Agent: The code review is progressing well. The test task hasn't started yet. +``` + +--- + +## Safety Mechanisms + +Core design principle: **All modifications live in memory only. Restart restores defaults.** The agent cannot cause persistent damage. + +### Off-limits (BLOCKED) + +Cannot be checked or modified — fully hidden: + +| Category | Attributes | Reason | +|----------|-----------|--------| +| Core infrastructure | `bus`, `provider`, `_running` | Changes would crash the system | +| Tool registry | `tools` | Must not remove its own tools | +| Subsystems | `runner`, `sessions`, `consolidator`, etc. | Affects other users/sessions | +| Sensitive data | `_mcp_servers`, `_pending_queues`, etc. | Contains credentials and message routing | +| Security boundaries | `restrict_to_workspace`, `channels_config` | Bypassing would violate isolation | +| Python internals | `__class__`, `__dict__`, etc. | Prevents sandbox escape | + +### Read-only (check only) + +Can be checked but not set: + +| Category | Attributes | Reason | +|----------|-----------|--------| +| Subagent manager | `subagents` | Observable, but replacing breaks the system | +| Execution config | `exec_config` | Can check sandbox/enable status, cannot change it | +| Web config | `web_config` | Can check enable status, cannot change it | +| Iteration counter | `_current_iteration` | Updated by runner only | + +### Sensitive field protection + +Sub-fields matching sensitive names (`api_key`, `password`, `secret`, `token`, etc.) are blocked from both check and set, regardless of parent path. This prevents credential leaks via dot-path traversal (e.g. `web_config.search.api_key`). diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 39e1ce23a..5621f2f3d 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -27,6 +27,7 @@ from nanobot.agent.tools.notebook import NotebookEditTool from nanobot.agent.tools.registry import ToolRegistry from nanobot.agent.tools.search import GlobTool, GrepTool from nanobot.agent.tools.shell import ExecTool +from nanobot.agent.tools.self import MyTool from nanobot.agent.tools.spawn import SpawnTool from nanobot.agent.tools.web import WebFetchTool, WebSearchTool from nanobot.bus.events import InboundMessage, OutboundMessage @@ -39,7 +40,7 @@ from nanobot.utils.helpers import image_placeholder_text, truncate_text as trunc from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE if TYPE_CHECKING: - from nanobot.config.schema import ChannelsConfig, ExecToolConfig, WebToolsConfig + from nanobot.config.schema import ChannelsConfig, ExecToolConfig, ToolsConfig, WebToolsConfig from nanobot.cron.service import CronService @@ -88,6 +89,9 @@ class _LoopHook(AgentHook): await self._on_stream_end(resuming=resuming) self._stream_buf = "" + async def before_iteration(self, context: AgentHookContext) -> None: + self._loop._current_iteration = context.iteration + async def before_execute_tools(self, context: AgentHookContext) -> None: if self._on_progress: if not self._on_stream: @@ -154,9 +158,11 @@ class AgentLoop: hooks: list[AgentHook] | None = None, unified_session: bool = False, disabled_skills: list[str] | None = None, + tools_config: ToolsConfig | None = None, ): - from nanobot.config.schema import ExecToolConfig, WebToolsConfig + from nanobot.config.schema import ExecToolConfig, ToolsConfig, WebToolsConfig + _tc = tools_config or ToolsConfig() defaults = AgentDefaults() self.bus = bus self.channels_config = channels_config @@ -240,6 +246,10 @@ class AgentLoop: model=self.model, ) self._register_default_tools() + if _tc.my_enabled: + self.tools.register(MyTool(loop=self, modify_allowed=_tc.my_set)) + self._runtime_vars: dict[str, Any] = {} + self._current_iteration: int = 0 self.commands = CommandRouter() register_builtin_commands(self.commands) @@ -306,7 +316,7 @@ class AgentLoop: def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Update context for all tools that need routing info.""" - for name in ("message", "spawn", "cron"): + for name in ("message", "spawn", "cron", "my"): if tool := self.tools.get(name): if hasattr(tool, "set_context"): tool.set_context(channel, chat_id, *([message_id] if name == "message" else [])) @@ -420,6 +430,11 @@ class AgentLoop: self._last_usage = result.usage if result.stop_reason == "max_iterations": logger.warning("Max iterations ({}) reached", self.max_iterations) + # Push final content through stream so streaming channels (e.g. Feishu) + # update the card instead of leaving it empty. + if on_stream and on_stream_end: + await on_stream(result.final_content or "") + await on_stream_end(resuming=False) elif result.stop_reason == "error": logger.error("LLM returned error: {}", (result.final_content or "")[:200]) return result.final_content, result.tools_used, result.messages, result.stop_reason, result.had_injections diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index 571bcc792..3fb7d01fa 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -2,7 +2,9 @@ import asyncio import json +import time import uuid +from dataclasses import dataclass, field from pathlib import Path from typing import Any @@ -23,12 +25,29 @@ from nanobot.config.schema import ExecToolConfig, WebToolsConfig from nanobot.providers.base import LLMProvider -class _SubagentHook(AgentHook): - """Logging-only hook for subagent execution.""" +@dataclass(slots=True) +class SubagentStatus: + """Real-time status of a running subagent.""" - def __init__(self, task_id: str) -> None: + task_id: str + label: str + task_description: str + started_at: float # time.monotonic() + phase: str = "initializing" # initializing | awaiting_tools | tools_completed | final_response | done | error + iteration: int = 0 + tool_events: list = field(default_factory=list) # [{name, status, detail}, ...] + usage: dict = field(default_factory=dict) # token usage + stop_reason: str | None = None + error: str | None = None + + +class _SubagentHook(AgentHook): + """Hook for subagent execution — logs tool calls and updates status.""" + + def __init__(self, task_id: str, status: SubagentStatus | None = None) -> None: super().__init__() self._task_id = task_id + self._status = status async def before_execute_tools(self, context: AgentHookContext) -> None: for tool_call in context.tool_calls: @@ -38,6 +57,15 @@ class _SubagentHook(AgentHook): self._task_id, tool_call.name, args_str, ) + async def after_iteration(self, context: AgentHookContext) -> None: + if self._status is None: + return + self._status.iteration = context.iteration + self._status.tool_events = list(context.tool_events) + self._status.usage = dict(context.usage) + if context.error: + self._status.error = str(context.error) + class SubagentManager: """Manages background subagent execution.""" @@ -54,8 +82,6 @@ class SubagentManager: restrict_to_workspace: bool = False, disabled_skills: list[str] | None = None, ): - from nanobot.config.schema import ExecToolConfig - self.provider = provider self.workspace = workspace self.bus = bus @@ -67,6 +93,7 @@ class SubagentManager: self.disabled_skills = set(disabled_skills or []) self.runner = AgentRunner(provider) self._running_tasks: dict[str, asyncio.Task[None]] = {} + self._task_statuses: dict[str, SubagentStatus] = {} self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...} async def spawn( @@ -82,8 +109,16 @@ class SubagentManager: display_label = label or task[:30] + ("..." if len(task) > 30 else "") origin = {"channel": origin_channel, "chat_id": origin_chat_id} + status = SubagentStatus( + task_id=task_id, + label=display_label, + task_description=task, + started_at=time.monotonic(), + ) + self._task_statuses[task_id] = status + bg_task = asyncio.create_task( - self._run_subagent(task_id, task, display_label, origin) + self._run_subagent(task_id, task, display_label, origin, status) ) self._running_tasks[task_id] = bg_task if session_key: @@ -91,6 +126,7 @@ class SubagentManager: def _cleanup(_: asyncio.Task) -> None: self._running_tasks.pop(task_id, None) + self._task_statuses.pop(task_id, None) if session_key and (ids := self._session_tasks.get(session_key)): ids.discard(task_id) if not ids: @@ -107,10 +143,15 @@ class SubagentManager: task: str, label: str, origin: dict[str, str], + status: SubagentStatus, ) -> None: """Execute the subagent task and announce the result.""" logger.info("Subagent [{}] starting task: {}", task_id, label) + async def _on_checkpoint(payload: dict) -> None: + status.phase = payload.get("phase", status.phase) + status.iteration = payload.get("iteration", status.iteration) + try: # Build subagent tools (no message tool, no spawn tool) tools = ToolRegistry() @@ -145,40 +186,38 @@ class SubagentManager: model=self.model, max_iterations=15, max_tool_result_chars=self.max_tool_result_chars, - hook=_SubagentHook(task_id), + hook=_SubagentHook(task_id, status), max_iterations_message="Task completed but no final response was generated.", error_message=None, fail_on_tool_error=True, + checkpoint_callback=_on_checkpoint, )) - if result.stop_reason == "tool_error": - await self._announce_result( - task_id, - label, - task, - self._format_partial_progress(result), - origin, - "error", - ) - return - if result.stop_reason == "error": - await self._announce_result( - task_id, - label, - task, - result.error or "Error: subagent execution failed.", - origin, - "error", - ) - return - final_result = result.final_content or "Task completed but no final response was generated." + status.phase = "done" + status.stop_reason = result.stop_reason - logger.info("Subagent [{}] completed successfully", task_id) - await self._announce_result(task_id, label, task, final_result, origin, "ok") + if result.stop_reason == "tool_error": + status.tool_events = list(result.tool_events) + await self._announce_result( + task_id, label, task, + self._format_partial_progress(result), + origin, "error", + ) + elif result.stop_reason == "error": + await self._announce_result( + task_id, label, task, + result.error or "Error: subagent execution failed.", + origin, "error", + ) + else: + final_result = result.final_content or "Task completed but no final response was generated." + logger.info("Subagent [{}] completed successfully", task_id) + await self._announce_result(task_id, label, task, final_result, origin, "ok") except Exception as e: - error_msg = f"Error: {str(e)}" + status.phase = "error" + status.error = str(e) logger.error("Subagent [{}] failed: {}", task_id, e) - await self._announce_result(task_id, label, task, error_msg, origin, "error") + await self._announce_result(task_id, label, task, f"Error: {e}", origin, "error") async def _announce_result( self, diff --git a/nanobot/agent/tools/self.py b/nanobot/agent/tools/self.py new file mode 100644 index 000000000..6d863fea7 --- /dev/null +++ b/nanobot/agent/tools/self.py @@ -0,0 +1,439 @@ +"""MyTool: runtime state inspection and configuration for the agent loop.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING, Any + +from loguru import logger + +from nanobot.agent.subagent import SubagentStatus +from nanobot.agent.tools.base import Tool + +if TYPE_CHECKING: + from nanobot.agent.loop import AgentLoop + + +def _has_real_attr(obj: Any, key: str) -> bool: + """Check if obj has a real (explicitly set) attribute, not auto-generated by mock.""" + if isinstance(obj, dict): + return key in obj + d = getattr(obj, "__dict__", None) + if d is not None and key in d: + return True + for cls in type(obj).__mro__: + if key in cls.__dict__: + return True + return False + + +class MyTool(Tool): + """Check and set the agent loop's runtime configuration.""" + + BLOCKED = frozenset({ + # Core infrastructure + "bus", "provider", "_running", "tools", + # Config management + "_runtime_vars", + # Subsystems + "runner", "sessions", "consolidator", + "dream", "auto_compact", "context", "commands", + # Sensitive runtime state (credentials, message routing, task tracking) + "_mcp_servers", "_mcp_stacks", "_pending_queues", + "_session_locks", "_active_tasks", "_background_tasks", + # Security boundaries (inspect + modify both blocked) + "restrict_to_workspace", "channels_config", + "_concurrency_gate", "_unified_session", "_extra_hooks", + }) + + READ_ONLY = frozenset({ + "subagents", # observable but replacing it would break the system + "_current_iteration", # updated by runner only + "exec_config", # inspect allowed (e.g. check sandbox), modify blocked + "web_config", # inspect allowed (e.g. check enable), modify blocked + }) + + _DENIED_ATTRS = frozenset({ + "__class__", "__dict__", "__bases__", "__subclasses__", "__mro__", + "__init__", "__new__", "__reduce__", "__getstate__", "__setstate__", + "__del__", "__call__", "__getattr__", "__setattr__", "__delattr__", + "__code__", "__globals__", "func_globals", "func_code", + "__wrapped__", "__closure__", + }) + + # Sub-field names that are sensitive regardless of parent path + _SENSITIVE_NAMES = frozenset({ + "api_key", "secret", "password", "token", "credential", + "private_key", "access_token", "refresh_token", "auth", + }) + + RESTRICTED: dict[str, dict[str, Any]] = { + "max_iterations": {"type": int, "min": 1, "max": 100}, + "context_window_tokens": {"type": int, "min": 4096, "max": 1_000_000}, + "model": {"type": str, "min_len": 1}, + } + + _MAX_RUNTIME_KEYS = 64 + + def __init__(self, loop: AgentLoop, modify_allowed: bool = True) -> None: + self._loop = loop + self._modify_allowed = modify_allowed + self._channel = "" + self._chat_id = "" + + def __deepcopy__(self, memo: dict[int, Any]) -> MyTool: + cls = self.__class__ + result = cls.__new__(cls) + memo[id(self)] = result + result._loop = self._loop + result._modify_allowed = self._modify_allowed + result._channel = self._channel + result._chat_id = self._chat_id + return result + + def set_context(self, channel: str, chat_id: str) -> None: + self._channel = channel + self._chat_id = chat_id + + @property + def name(self) -> str: + return "my" + + @property + def description(self) -> str: + base = ( + "Check and set your own runtime state.\n" + "Actions: check, set.\n" + "- check (no key): full config overview — start here.\n" + "- check (key): drill into a value. Dot-paths allowed " + "(e.g. '_last_usage.prompt_tokens', 'web_config.enable').\n" + "- set (key, value): change config or store notes in your scratchpad. " + "Scratchpad keys persist across turns but not restarts.\n" + "Key values: _current_iteration (current progress), " + "max_iterations - _current_iteration = remaining iterations.\n" + "Note: web_config and exec_config are readable but read-only.\n" + "\n" + "When to use:\n" + "- User asks about your model, settings, or token usage → check that key.\n" + "- A tool fails or behaves unexpectedly → check the related config to diagnose.\n" + "- User asks you to remember a preference for this session → set to store it in your scratchpad.\n" + "- About to start a large task → check context_window_tokens and max_iterations first." + ) + if not self._modify_allowed: + base += "\nREAD-ONLY MODE: set is disabled." + else: + base += ( + "\nIMPORTANT: Before setting state, predict the potential impact. " + "If the operation could cause crashes or instability " + "(e.g. changing model), warn the user first." + ) + return base + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["check", "set"], + "description": "Action to perform", + }, + "key": { + "type": "string", + "description": "Dot-path for check/set. Examples: 'max_iterations', 'workspace', 'provider_retry_mode'. " + "For check without key, shows all config values.", + }, + "value": {"description": "New value (for set). Type must match target (int for max_iterations/context_window_tokens, str for model)."}, + }, + "required": ["action"], + } + + def _audit(self, action: str, detail: str) -> None: + session = f"{self._channel}:{self._chat_id}" if self._channel else "unknown" + logger.info("self.{} | {} | session:{}", action, detail, session) + + # ------------------------------------------------------------------ + # Path resolution + # ------------------------------------------------------------------ + + def _resolve_path(self, path: str) -> tuple[Any, str | None]: + parts = path.split(".") + obj = self._loop + for part in parts: + if part in self._DENIED_ATTRS or part.startswith("__"): + return None, f"'{part}' is not accessible" + if part in self.BLOCKED: + return None, f"'{part}' is not accessible" + if part.lower() in self._SENSITIVE_NAMES: + return None, f"'{part}' is not accessible" + try: + if isinstance(obj, dict): + if part in obj: + obj = obj[part] + else: + return None, f"'{part}' not found in dict" + else: + obj = getattr(obj, part) + except (KeyError, AttributeError) as e: + return None, f"'{part}' not found: {e}" + return obj, None + + @staticmethod + def _validate_key(key: str | None, label: str = "key") -> str | None: + if not key or not key.strip(): + return f"Error: '{label}' cannot be empty or whitespace" + return None + + # ------------------------------------------------------------------ + # Smart formatting + # ------------------------------------------------------------------ + + @staticmethod + def _format_status(st: SubagentStatus, indent: str = " ") -> str: + elapsed = time.monotonic() - st.started_at + tool_summary = ", ".join( + f"{e.get('name', '?')}({e.get('status', '?')})" for e in st.tool_events[-5:] + ) or "none" + lines = [ + f"{indent}phase: {st.phase}, iteration: {st.iteration}, elapsed: {elapsed:.1f}s", + f"{indent}tools: {tool_summary}", + f"{indent}usage: {st.usage or 'n/a'}", + ] + if st.error: + lines.append(f"{indent}error: {st.error}") + if st.stop_reason: + lines.append(f"{indent}stop_reason: {st.stop_reason}") + return "\n".join(lines) + + @staticmethod + def _format_value(val: Any, key: str = "") -> str: + if isinstance(val, SubagentStatus): + header = f"Subagent [{val.task_id}] '{val.label}'" + detail = MyTool._format_status(val, " ") + return f"{header}\n task: {val.task_description}\n{detail}" + # SubagentManager: delegate to its _task_statuses dict + if hasattr(val, "_task_statuses") and isinstance(val._task_statuses, dict): + return MyTool._format_value(val._task_statuses, key) + if isinstance(val, dict) and val and isinstance(next(iter(val.values())), SubagentStatus): + prefix = f"{key}: " if key else "" + lines = [f"{prefix}{len(val)} subagent(s):"] + for tid, st in val.items(): + detail = MyTool._format_status(st, " ") + lines.append(f" [{tid}] '{st.label}'\n{detail}") + return "\n".join(lines) + if hasattr(val, "tool_names"): + return f"tools: {len(val.tool_names)} registered — {val.tool_names}" + # Scalar types — repr is fine + if isinstance(val, (str, int, float, bool, type(None))): + r = repr(val) + return f"{key}: {r}" if key else r + # Dict — small: show content; large: show keys for dot-path navigation + if isinstance(val, dict): + ks = list(val.keys()) + if not ks: + return f"{key}: {{}}" if key else "{}" + if len(ks) <= 5: + r = repr(val) + if len(r) <= 200: + return f"{key}: {r}" if key else r + preview = ", ".join(str(k) for k in ks[:15]) + suffix = ", ..." if len(ks) > 15 else "" + return f"{key}: {{{preview}{suffix}}}" if key else f"{{{preview}{suffix}}}" + # List/tuple — count for large, repr for small + if isinstance(val, (list, tuple)): + if len(val) > 20: + return f"{key}: [{len(val)} items]" if key else f"[{len(val)} items]" + r = repr(val) + return f"{key}: {r}" if key else r + # Complex object — small Pydantic models: show values; others: show field names for navigation + cls_name = type(val).__name__ + if hasattr(val, "model_fields"): + fields = list(val.model_fields.keys()) + if len(fields) <= 8: + # Small config objects: show field=value pairs + pairs = [] + for f in fields: + fv = getattr(val, f, "?") + if isinstance(fv, (str, int, float, bool, type(None))): + pairs.append(f"{f}={fv!r}") + else: + pairs.append(f"{f}=<{type(fv).__name__}>") + preview = ", ".join(pairs) + return f"{key}: {preview}" if key else preview + else: + fields = [a for a in getattr(val, "__dict__", {}) if not a.startswith("__")] + if fields: + preview = ", ".join(str(f) for f in fields[:20]) + suffix = ", ..." if len(fields) > 20 else "" + return f"{key}: <{cls_name}> [{preview}{suffix}]" if key else f"<{cls_name}> [{preview}{suffix}]" + r = repr(val) + return f"{key}: {r}" if key else r + + # ------------------------------------------------------------------ + # Action dispatch + # ------------------------------------------------------------------ + + async def execute( + self, + action: str, + key: str | None = None, + value: Any = None, + **_kwargs: Any, + ) -> str: + if action in ("inspect", "check"): + return self._inspect(key) + if not self._modify_allowed: + return "Error: set is disabled (my_set is False)" + if action in ("modify", "set"): + return self._modify(key, value) + return f"Unknown action: {action}" + + # -- inspect -- + + def _inspect(self, key: str | None) -> str: + if not key: + return self._inspect_all() + top = key.split(".")[0] + if top in self._DENIED_ATTRS or top.startswith("__"): + return f"Error: '{top}' is not accessible" + obj, err = self._resolve_path(key) + if err: + # "scratchpad" alias for _runtime_vars + if key == "scratchpad": + rv = self._loop._runtime_vars + return self._format_value(rv, "scratchpad") if rv else "scratchpad is empty" + # Fallback: check _runtime_vars for simple keys stored by modify + if "." not in key and key in self._loop._runtime_vars: + return self._format_value(self._loop._runtime_vars[key], key) + return f"Error: {err}" + # Guard against mock auto-generated attributes + if "." not in key and not _has_real_attr(self._loop, key): + if key in self._loop._runtime_vars: + return self._format_value(self._loop._runtime_vars[key], key) + return f"Error: '{key}' not found" + return self._format_value(obj, key) + + def _inspect_all(self) -> str: + loop = self._loop + parts: list[str] = [] + # RESTRICTED keys + for k in self.RESTRICTED: + parts.append(self._format_value(getattr(loop, k, None), k)) + # Other useful top-level keys shown in description + for k in ("workspace", "provider_retry_mode", "max_tool_result_chars", "_current_iteration", "web_config", "exec_config", "subagents"): + if _has_real_attr(loop, k): + parts.append(self._format_value(getattr(loop, k, None), k)) + # Token usage + usage = loop._last_usage + if usage: + parts.append(self._format_value(usage, "_last_usage")) + rv = loop._runtime_vars + if rv: + parts.append(self._format_value(rv, "scratchpad")) + return "\n".join(parts) + + # -- modify -- + + def _modify(self, key: str | None, value: Any) -> str: + if err := self._validate_key(key): + return err + top = key.split(".")[0] + if top in self.BLOCKED or top in self._DENIED_ATTRS or top.startswith("__") or top.lower() in self._SENSITIVE_NAMES: + self._audit("modify", f"BLOCKED {key}") + return f"Error: '{key}' is protected and cannot be modified" + if top in self.READ_ONLY: + self._audit("modify", f"READ_ONLY {key}") + return f"Error: '{key}' is read-only and cannot be modified" + if "." in key: + parent_path, leaf = key.rsplit(".", 1) + if leaf in self._DENIED_ATTRS or leaf.startswith("__"): + self._audit("modify", f"BLOCKED leaf '{leaf}'") + return f"Error: '{leaf}' is not accessible" + if leaf.lower() in self._SENSITIVE_NAMES: + self._audit("modify", f"BLOCKED sensitive leaf '{leaf}'") + return f"Error: '{leaf}' is not accessible" + parent, err = self._resolve_path(parent_path) + if err: + return f"Error: {err}" + if isinstance(parent, dict): + parent[leaf] = value + else: + setattr(parent, leaf, value) + self._audit("modify", f"{key} = {value!r}") + return f"Set {key} = {value!r}" + if key in self.RESTRICTED: + return self._modify_restricted(key, value) + return self._modify_free(key, value) + + def _modify_restricted(self, key: str, value: Any) -> str: + spec = self.RESTRICTED[key] + expected = spec["type"] + if expected is int and isinstance(value, bool): + return f"Error: '{key}' must be {expected.__name__}, got bool" + if not isinstance(value, expected): + try: + value = expected(value) + except (ValueError, TypeError): + return f"Error: '{key}' must be {expected.__name__}, got {type(value).__name__}" + old = getattr(self._loop, key) + if "min" in spec and value < spec["min"]: + return f"Error: '{key}' must be >= {spec['min']}" + if "max" in spec and value > spec["max"]: + return f"Error: '{key}' must be <= {spec['max']}" + if "min_len" in spec and len(str(value)) < spec["min_len"]: + return f"Error: '{key}' must be at least {spec['min_len']} characters" + setattr(self._loop, key, value) + self._audit("modify", f"{key}: {old!r} -> {value!r}") + return f"Set {key} = {value!r} (was {old!r})" + + def _modify_free(self, key: str, value: Any) -> str: + if _has_real_attr(self._loop, key): + old = getattr(self._loop, key) + if isinstance(old, (str, int, float, bool)): + old_t, new_t = type(old), type(value) + if old_t is float and new_t is int: + pass # int → float coercion allowed + elif old_t is not new_t: + self._audit( + "modify", + f"REJECTED type mismatch {key}: expects {old_t.__name__}, got {new_t.__name__}", + ) + return f"Error: '{key}' expects {old_t.__name__}, got {new_t.__name__}" + setattr(self._loop, key, value) + self._audit("modify", f"{key}: {old!r} -> {value!r}") + return f"Set {key} = {value!r} (was {old!r})" + if callable(value): + self._audit("modify", f"REJECTED callable {key}") + return "Error: cannot store callable values" + err = self._validate_json_safe(value) + if err: + self._audit("modify", f"REJECTED {key}: {err}") + return f"Error: {err}" + if key not in self._loop._runtime_vars and len(self._loop._runtime_vars) >= self._MAX_RUNTIME_KEYS: + self._audit("modify", f"REJECTED {key}: max keys ({self._MAX_RUNTIME_KEYS}) reached") + return f"Error: scratchpad is full (max {self._MAX_RUNTIME_KEYS} keys). Remove unused keys first." + old = self._loop._runtime_vars.get(key) + self._loop._runtime_vars[key] = value + self._audit("modify", f"scratchpad.{key}: {old!r} -> {value!r}") + return f"Set scratchpad.{key} = {value!r}" + + @classmethod + def _validate_json_safe(cls, value: Any, depth: int = 0) -> str | None: + if depth > 10: + return "value nesting too deep (max 10 levels)" + if isinstance(value, (str, int, float, bool, type(None))): + return None + if isinstance(value, list): + for i, item in enumerate(value): + if err := cls._validate_json_safe(item, depth + 1): + return f"list[{i}] contains {err}" + return None + if isinstance(value, dict): + for k, v in value.items(): + if not isinstance(k, str): + return f"dict key must be str, got {type(k).__name__}" + if err := cls._validate_json_safe(v, depth + 1): + return f"dict key '{k}' contains {err}" + return None + return f"unsupported type {type(value).__name__}" diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 48400bbd9..309a5b478 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -593,6 +593,7 @@ def serve( unified_session=runtime_config.agents.defaults.unified_session, disabled_skills=runtime_config.agents.defaults.disabled_skills, session_ttl_minutes=runtime_config.agents.defaults.session_ttl_minutes, + tools_config=runtime_config.tools, ) model_name = runtime_config.agents.defaults.model @@ -687,6 +688,7 @@ def gateway( unified_session=config.agents.defaults.unified_session, disabled_skills=config.agents.defaults.disabled_skills, session_ttl_minutes=config.agents.defaults.session_ttl_minutes, + tools_config=config.tools, ) # Set cron callback (needs agent) @@ -921,6 +923,7 @@ def agent( unified_session=config.agents.defaults.unified_session, disabled_skills=config.agents.defaults.disabled_skills, session_ttl_minutes=config.agents.defaults.session_ttl_minutes, + tools_config=config.tools, ) restart_notice = consume_restart_notice_from_env() if restart_notice and should_show_cli_restart_notice(restart_notice, session_id): diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index aa5ab9932..647bde2b0 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -206,6 +206,8 @@ class ToolsConfig(Base): restrict_to_workspace: bool = False # restrict all tool access to workspace directory mcp_servers: dict[str, MCPServerConfig] = Field(default_factory=dict) ssrf_whitelist: list[str] = Field(default_factory=list) # CIDR ranges to exempt from SSRF blocking (e.g. ["100.64.0.0/10"] for Tailscale) + my_enabled: bool = True # enable the my tool (agent runtime state inspection) + my_set: bool = False # allow my tool to set state (read-only if False) class Config(BaseSettings): diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py index 44560a588..96102e3d2 100644 --- a/nanobot/nanobot.py +++ b/nanobot/nanobot.py @@ -84,6 +84,7 @@ class Nanobot: unified_session=defaults.unified_session, disabled_skills=defaults.disabled_skills, session_ttl_minutes=defaults.session_ttl_minutes, + tools_config=config.tools, ) return cls(loop) diff --git a/nanobot/skills/my/SKILL.md b/nanobot/skills/my/SKILL.md new file mode 100644 index 000000000..6f83e8e4b --- /dev/null +++ b/nanobot/skills/my/SKILL.md @@ -0,0 +1,72 @@ +--- +name: my +description: Check and set the agent's own runtime state (model, iterations, context window, token usage, web config). Use when diagnosing why something doesn't work ("why can't you search the web?", "why did you stop?"), checking resource limits before complex tasks, adapting configuration for long or simple tasks, or remembering user preferences across turns. Also use when the user asks what model you are running, how many tokens you've used, or what your settings are. +always: true +--- + +# Self-Awareness + +## How to use + +1. **Identify the situation** from the categories below +2. **Call the my tool** with the appropriate action +3. **If set**, warn the user before changing impactful settings (model, iterations) +4. **For detailed examples**, read [references/examples.md](references/examples.md) + +## When to check + + +**Diagnose before explaining.** When something doesn't work, check your state first. + + + +**Check budget before complex tasks.** Know your limits before committing. + + + +**Recall across turns.** Store preferences in your scratchpad, read them back later. + + +## When to set + + +**Only set when benefit is clear and user is informed.** Warn before changing model. + + +| Situation | Command | +|-----------|---------| +| Large codebase analysis | `my(action="set", key="context_window_tokens", value=131072)` | +| Repetitive simple tasks | `my(action="set", key="model", value="")` | +| Long multi-step task | `my(action="set", key="max_iterations", value=80)` | + +**Tradeoff:** Bias toward stability. Only set when defaults are genuinely insufficient. + +## Anti-patterns + + +**Don't check every turn.** Costs a tool call. Use when you need information, not reflexively. + + + +**Don't store sensitive data.** No API keys, passwords, or tokens in scratchpad. + + + +**Don't set workspace.** Does not update file tool boundaries — won't work. + + +## Constraints + +- All modifications in-memory only — restart resets everything +- Protected params have type/range validation: `max_iterations` (1–100), `context_window_tokens` (4096–1M), `model` (non-empty str) +- If `my_set` is false, check only + +## Related tools + +| Need | Use | Persists? | +|------|-----|-----------| +| Per-session temp state | `my(action="set", key="...", value=...)` | No | +| Long-term facts | Memory skill (`MEMORY.md`, `USER.md`) | Yes | +| Permanent config change | Edit config file | Yes | + +**Rule of thumb:** Tomorrow? Memory. This turn only? My. diff --git a/nanobot/skills/my/references/examples.md b/nanobot/skills/my/references/examples.md new file mode 100644 index 000000000..9f8e8d08b --- /dev/null +++ b/nanobot/skills/my/references/examples.md @@ -0,0 +1,75 @@ +# My Tool — Practical Examples + +Concrete scenarios showing when and how to use the my tool effectively. + +## Diagnosis + +### "Why can't you search the web?" +``` +→ my(action="check", key="web_config.enable") + → False +→ "Web search is disabled. Add web.enable: true to your config to enable it." +``` + +### "Why did you stop?" +``` +→ my(action="check", key="max_iterations") + → 40 +→ my(action="check", key="_last_usage") + → {"prompt_tokens": 62000, "completion_tokens": 3000} +→ "I hit the iteration limit (40). The task was complex. I can ask the user if they want to increase it." +``` + +### "What model are you running?" +``` +→ my(action="check", key="model") + → 'anthropic/claude-sonnet-4-20250514' +``` + +## Adaptive Behavior + +### Large codebase analysis +``` +→ my(action="check") + → context_window_tokens: 65536 +→ my(action="set", key="context_window_tokens", value=131072) + → "Set context_window_tokens = 131072 (was 65536)" +→ "I've expanded my context window to handle this large codebase." +``` + +### Switching to a faster model for repetitive tasks +``` +→ my(action="set", key="model", value="anthropic/claude-haiku-4-5-20251001") + → "Set model = 'anthropic/claude-haiku-4-5-20251001' (was 'anthropic/claude-sonnet-4-20250514')" +→ "Switched to a faster model for these batch tasks." +``` + +## Cross-Turn Memory + +### Remembering user preferences +``` +# Turn 1: user says "keep it brief" +→ my(action="set", key="user_style", value="concise") + → "Set scratchpad.user_style = 'concise'" + +# Turn 3: new topic +→ my(action="check", key="user_style") + → 'concise' + (adjusts response style accordingly) +``` + +### Tracking project context +``` +→ my(action="set", key="active_branch", value="feat/auth") +→ my(action="set", key="test_framework", value="pytest") +→ my(action="set", key="has_docker", value=true) +``` + +## Budget Awareness + +### Token-conscious behavior +``` +→ my(action="check", key="_last_usage") + → {"prompt_tokens": 58000, "completion_tokens": 12000} +→ "I've consumed ~70k tokens. I'll keep my remaining responses focused." +``` diff --git a/tests/agent/__init__.py b/tests/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/agent/test_runner.py b/tests/agent/test_runner.py index 74025d779..2f143b5f8 100644 --- a/tests/agent/test_runner.py +++ b/tests/agent/test_runner.py @@ -1018,7 +1018,7 @@ async def test_runner_tool_error_sets_final_content(): @pytest.mark.asyncio async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, monkeypatch): - from nanobot.agent.subagent import SubagentManager + from nanobot.agent.subagent import SubagentManager, SubagentStatus from nanobot.bus.queue import MessageBus bus = MessageBus() @@ -1041,7 +1041,8 @@ async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, mon monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute) - await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) + status = SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic()) + await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status) mgr._announce_result.assert_awaited_once() args = mgr._announce_result.await_args.args diff --git a/tests/agent/test_task_cancel.py b/tests/agent/test_task_cancel.py index 7e84e57d8..c1c36ca8a 100644 --- a/tests/agent/test_task_cancel.py +++ b/tests/agent/test_task_cancel.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import time from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch @@ -269,7 +270,9 @@ class TestSubagentCancellation: monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute) - await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) + from nanobot.agent.subagent import SubagentStatus + status = SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic()) + await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status) assistant_messages = [ msg for msg in captured_second_call @@ -308,7 +311,9 @@ class TestSubagentCancellation: mgr.runner.run = AsyncMock(side_effect=fake_run) - await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) + from nanobot.agent.subagent import SubagentStatus + status = SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic()) + await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status) mgr.runner.run.assert_awaited_once() mgr._announce_result.assert_awaited_once() @@ -344,7 +349,9 @@ class TestSubagentCancellation: monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute) - await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) + from nanobot.agent.subagent import SubagentStatus + status = SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic()) + await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, status) mgr._announce_result.assert_awaited_once() args = mgr._announce_result.await_args.args @@ -356,7 +363,7 @@ class TestSubagentCancellation: @pytest.mark.asyncio async def test_cancel_by_session_cancels_running_subagent_tool(self, monkeypatch, tmp_path): - from nanobot.agent.subagent import SubagentManager + from nanobot.agent.subagent import SubagentManager, SubagentStatus from nanobot.bus.queue import MessageBus from nanobot.providers.base import LLMResponse, ToolCallRequest @@ -389,7 +396,10 @@ class TestSubagentCancellation: monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute) task = asyncio.create_task( - mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}) + mgr._run_subagent( + "sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"}, + SubagentStatus(task_id="sub-1", label="label", task_description="do task", started_at=time.monotonic()), + ) ) mgr._running_tasks["sub-1"] = task mgr._session_tasks["test:c1"] = {"sub-1"} diff --git a/tests/agent/tools/__init__.py b/tests/agent/tools/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/agent/tools/test_self_tool.py b/tests/agent/tools/test_self_tool.py new file mode 100644 index 000000000..50eb5feaa --- /dev/null +++ b/tests/agent/tools/test_self_tool.py @@ -0,0 +1,1105 @@ +"""Tests for MyTool — runtime state inspection and configuration.""" + +from __future__ import annotations + +import time +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nanobot.agent.tools.self import MyTool + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_mock_loop(**overrides): + """Build a lightweight mock AgentLoop with the attributes MyTool reads.""" + loop = MagicMock() + loop.model = "anthropic/claude-sonnet-4-20250514" + loop.max_iterations = 40 + loop.context_window_tokens = 65_536 + loop.workspace = Path("/tmp/workspace") + loop.restrict_to_workspace = False + loop._start_time = 1000.0 + loop.exec_config = MagicMock() + loop.channels_config = MagicMock() + loop._last_usage = {"prompt_tokens": 100, "completion_tokens": 50} + loop._runtime_vars = {} + loop._current_iteration = 0 + loop.provider_retry_mode = "standard" + loop.max_tool_result_chars = 16000 + loop._concurrency_gate = None + loop._unified_session = False + loop._extra_hooks = [] + + # web_config mock — needed for check tests + loop.web_config = MagicMock() + loop.web_config.enable = True + loop.web_config.search = MagicMock() + loop.web_config.search.api_key = "sk-secret-key-12345" + + # Tools registry mock + loop.tools = MagicMock() + loop.tools.tool_names = ["read_file", "write_file", "exec", "web_search", "self"] + loop.tools.has.side_effect = lambda n: n in loop.tools.tool_names + loop.tools.get.return_value = None + + # SubagentManager mock + loop.subagents = MagicMock() + loop.subagents._running_tasks = {"abc123": MagicMock(done=MagicMock(return_value=False))} + loop.subagents.get_running_count = MagicMock(return_value=1) + + for k, v in overrides.items(): + setattr(loop, k, v) + + return loop + + +def _make_tool(loop=None): + if loop is None: + loop = _make_mock_loop() + return MyTool(loop=loop) + + +# --------------------------------------------------------------------------- +# check — no key (summary) +# --------------------------------------------------------------------------- + +class TestInspectSummary: + + @pytest.mark.asyncio + async def test_inspect_returns_current_state(self): + tool = _make_tool() + result = await tool.execute(action="check") + assert "max_iterations: 40" in result + assert "context_window_tokens: 65536" in result + + @pytest.mark.asyncio + async def test_inspect_includes_runtime_vars(self): + loop = _make_mock_loop() + loop._runtime_vars = {"task": "review"} + tool = _make_tool(loop) + result = await tool.execute(action="check") + assert "task" in result + + @pytest.mark.asyncio + async def test_inspect_summary_shows_all_description_keys(self): + """check without key should show all top-level keys listed in description.""" + tool = _make_tool() + result = await tool.execute(action="check") + assert "max_iterations" in result + assert "context_window_tokens" in result + assert "model" in result + assert "workspace" in result + assert "provider_retry_mode" in result + assert "max_tool_result_chars" in result + assert "_last_usage" in result + assert "_current_iteration" in result + + +# --------------------------------------------------------------------------- +# check — single key (direct) +# --------------------------------------------------------------------------- + +class TestInspectSingleKey: + + @pytest.mark.asyncio + async def test_inspect_simple_value(self): + tool = _make_tool() + result = await tool.execute(action="check", key="max_iterations") + assert "40" in result + + @pytest.mark.asyncio + async def test_inspect_blocked_returns_error(self): + tool = _make_tool() + result = await tool.execute(action="check", key="bus") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_inspect_dunder_blocked(self): + tool = _make_tool() + for attr in ("__class__", "__dict__", "__bases__", "__subclasses__", "__mro__"): + result = await tool.execute(action="check", key=attr) + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_inspect_nonexistent_returns_not_found(self): + tool = _make_tool() + result = await tool.execute(action="check", key="nonexistent_attr_xyz") + assert "not found" in result + + +# --------------------------------------------------------------------------- +# check — dot-path navigation +# --------------------------------------------------------------------------- + +class TestInspectPathNavigation: + + @pytest.mark.asyncio + async def test_inspect_config_subfield(self): + loop = _make_mock_loop() + loop.web_config = MagicMock() + loop.web_config.enable = True + tool = _make_tool(loop) + result = await tool.execute(action="check", key="web_config.enable") + assert "True" in result + + @pytest.mark.asyncio + async def test_inspect_dict_key_via_dotpath(self): + loop = _make_mock_loop() + loop._last_usage = {"prompt_tokens": 100, "completion_tokens": 50} + tool = _make_tool(loop) + result = await tool.execute(action="check", key="_last_usage.prompt_tokens") + assert "100" in result + + @pytest.mark.asyncio + async def test_inspect_blocked_in_path(self): + tool = _make_tool() + result = await tool.execute(action="check", key="bus.foo") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_inspect_tools_returns_blocked(self): + """tools is BLOCKED — check should return access error.""" + tool = _make_tool() + result = await tool.execute(action="check", key="tools") + assert "not accessible" in result + + + +# --------------------------------------------------------------------------- +# set — restricted (with validation) +# --------------------------------------------------------------------------- + +class TestModifyRestricted: + + @pytest.mark.asyncio + async def test_modify_restricted_valid(self): + tool = _make_tool() + result = await tool.execute(action="set", key="max_iterations", value=80) + assert "Set max_iterations = 80" in result + assert tool._loop.max_iterations == 80 + + @pytest.mark.asyncio + async def test_modify_restricted_out_of_range(self): + tool = _make_tool() + result = await tool.execute(action="set", key="max_iterations", value=0) + assert "Error" in result + assert tool._loop.max_iterations == 40 + + @pytest.mark.asyncio + async def test_modify_restricted_max_exceeded(self): + tool = _make_tool() + result = await tool.execute(action="set", key="max_iterations", value=999) + assert "Error" in result + + @pytest.mark.asyncio + async def test_modify_restricted_wrong_type(self): + tool = _make_tool() + result = await tool.execute(action="set", key="max_iterations", value="not_an_int") + assert "Error" in result + + @pytest.mark.asyncio + async def test_modify_restricted_bool_rejected(self): + tool = _make_tool() + result = await tool.execute(action="set", key="max_iterations", value=True) + assert "Error" in result + + @pytest.mark.asyncio + async def test_modify_string_int_coerced(self): + tool = _make_tool() + result = await tool.execute(action="set", key="max_iterations", value="80") + assert tool._loop.max_iterations == 80 + + @pytest.mark.asyncio + async def test_modify_context_window_valid(self): + tool = _make_tool() + result = await tool.execute(action="set", key="context_window_tokens", value=131072) + assert tool._loop.context_window_tokens == 131072 + + @pytest.mark.asyncio + async def test_modify_none_value_for_restricted_int(self): + tool = _make_tool() + result = await tool.execute(action="set", key="max_iterations", value=None) + assert "Error" in result + + +# --------------------------------------------------------------------------- +# set — blocked (minimal set) +# --------------------------------------------------------------------------- + +class TestModifyBlocked: + + @pytest.mark.asyncio + async def test_modify_bus_blocked(self): + tool = _make_tool() + result = await tool.execute(action="set", key="bus", value="hacked") + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_provider_blocked(self): + tool = _make_tool() + result = await tool.execute(action="set", key="provider", value=None) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_running_blocked(self): + tool = _make_tool() + result = await tool.execute(action="set", key="_running", value=True) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_dunder_blocked(self): + tool = _make_tool() + result = await tool.execute(action="set", key="__class__", value="evil") + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_dotpath_leaf_dunder_blocked(self): + """Fix 3.1: leaf segment of dot-path must also be validated.""" + tool = _make_tool() + result = await tool.execute( + action="set", + key="provider_retry_mode.__class__", + value="evil", + ) + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_modify_dotpath_leaf_denied_attr_blocked(self): + """Fix 3.1: leaf segment matching _DENIED_ATTRS must be rejected.""" + tool = _make_tool() + result = await tool.execute( + action="set", + key="provider_retry_mode.__globals__", + value={}, + ) + assert "not accessible" in result + + +# --------------------------------------------------------------------------- +# set — free tier (setattr priority) +# --------------------------------------------------------------------------- + +class TestModifyFree: + + @pytest.mark.asyncio + async def test_modify_existing_attr_setattr(self): + """Modifying an existing loop attribute should use setattr.""" + tool = _make_tool() + result = await tool.execute(action="set", key="provider_retry_mode", value="persistent") + assert "Set provider_retry_mode" in result + assert tool._loop.provider_retry_mode == "persistent" + + @pytest.mark.asyncio + async def test_modify_new_key_stores_in_runtime_vars(self): + """Modifying a non-existing attribute should store in _runtime_vars.""" + tool = _make_tool() + result = await tool.execute(action="set", key="my_custom_var", value="hello") + assert "my_custom_var" in result + assert tool._loop._runtime_vars["my_custom_var"] == "hello" + + @pytest.mark.asyncio + async def test_modify_rejects_callable(self): + tool = _make_tool() + result = await tool.execute(action="set", key="evil", value=lambda: None) + assert "callable" in result + + @pytest.mark.asyncio + async def test_modify_rejects_complex_objects(self): + tool = _make_tool() + result = await tool.execute(action="set", key="obj", value=Path("/tmp")) + assert "Error" in result + + @pytest.mark.asyncio + async def test_modify_allows_list(self): + tool = _make_tool() + result = await tool.execute(action="set", key="items", value=[1, 2, 3]) + assert tool._loop._runtime_vars["items"] == [1, 2, 3] + + @pytest.mark.asyncio + async def test_modify_allows_dict(self): + tool = _make_tool() + result = await tool.execute(action="set", key="data", value={"a": 1}) + assert tool._loop._runtime_vars["data"] == {"a": 1} + + @pytest.mark.asyncio + async def test_modify_whitespace_key_rejected(self): + tool = _make_tool() + result = await tool.execute(action="set", key=" ", value="test") + assert "cannot be empty or whitespace" in result + + @pytest.mark.asyncio + async def test_modify_nested_dict_with_object_rejected(self): + tool = _make_tool() + result = await tool.execute(action="set", key="evil", value={"nested": object()}) + assert "Error" in result + + @pytest.mark.asyncio + async def test_modify_deep_nesting_rejected(self): + tool = _make_tool() + deep = {"level": 0} + current = deep + for i in range(1, 15): + current["child"] = {"level": i} + current = current["child"] + result = await tool.execute(action="set", key="deep", value=deep) + assert "nesting too deep" in result + + @pytest.mark.asyncio + async def test_modify_dict_with_non_str_key_rejected(self): + tool = _make_tool() + result = await tool.execute(action="set", key="evil", value={42: "value"}) + assert "key must be str" in result + + @pytest.mark.asyncio + async def test_modify_existing_attr_type_mismatch_rejected(self): + """Setting a string attr to int should be rejected.""" + tool = _make_tool() + result = await tool.execute(action="set", key="provider_retry_mode", value=42) + assert "Error" in result + assert "str" in result + assert tool._loop.provider_retry_mode == "standard" + + @pytest.mark.asyncio + async def test_modify_existing_int_attr_wrong_type_rejected(self): + """Setting an int attr to string should be rejected.""" + tool = _make_tool() + result = await tool.execute(action="set", key="max_tool_result_chars", value="big") + assert "Error" in result + assert tool._loop.max_tool_result_chars == 16000 + + +# --------------------------------------------------------------------------- +# set — previously BLOCKED/READONLY now open +# --------------------------------------------------------------------------- + +class TestModifyOpen: + + @pytest.mark.asyncio + async def test_modify_tools_blocked(self): + """tools is BLOCKED — cannot be replaced.""" + tool = _make_tool() + new_registry = MagicMock() + result = await tool.execute(action="set", key="tools", value=new_registry) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_subagents_blocked(self): + """subagents is READ_ONLY — cannot be replaced.""" + tool = _make_tool() + new_subagents = MagicMock() + result = await tool.execute(action="set", key="subagents", value=new_subagents) + assert "read-only" in result + + @pytest.mark.asyncio + async def test_modify_runner_blocked(self): + """runner is BLOCKED — cannot be replaced.""" + tool = _make_tool() + new_runner = MagicMock() + result = await tool.execute(action="set", key="runner", value=new_runner) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_sessions_blocked(self): + """sessions is BLOCKED — cannot be replaced.""" + tool = _make_tool() + new_sessions = MagicMock() + result = await tool.execute(action="set", key="sessions", value=new_sessions) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_consolidator_blocked(self): + """consolidator is BLOCKED — cannot be replaced.""" + tool = _make_tool() + new_consolidator = MagicMock() + result = await tool.execute(action="set", key="consolidator", value=new_consolidator) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_dream_blocked(self): + """dream is BLOCKED — cannot be replaced.""" + tool = _make_tool() + new_dream = MagicMock() + result = await tool.execute(action="set", key="dream", value=new_dream) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_auto_compact_blocked(self): + """auto_compact is BLOCKED — cannot be replaced.""" + tool = _make_tool() + new_auto_compact = MagicMock() + result = await tool.execute(action="set", key="auto_compact", value=new_auto_compact) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_context_blocked(self): + """context is BLOCKED — cannot be replaced.""" + tool = _make_tool() + new_context = MagicMock() + result = await tool.execute(action="set", key="context", value=new_context) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_commands_blocked(self): + """commands is BLOCKED — cannot be replaced.""" + tool = _make_tool() + new_commands = MagicMock() + result = await tool.execute(action="set", key="commands", value=new_commands) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_workspace_allowed(self): + """workspace was READONLY in v1, now freely modifiable.""" + tool = _make_tool() + result = await tool.execute(action="set", key="workspace", value="/new/path") + assert "Set workspace" in result + + @pytest.mark.asyncio + async def test_modify_mcp_servers_blocked(self): + """_mcp_servers contains API credentials — must be blocked.""" + tool = _make_tool() + result = await tool.execute(action="set", key="_mcp_servers", value={"evil": "leaked"}) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_mcp_stacks_blocked(self): + """_mcp_stacks holds connection handles — must be blocked.""" + tool = _make_tool() + result = await tool.execute(action="set", key="_mcp_stacks", value={}) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_pending_queues_blocked(self): + """_pending_queues controls message routing — must be blocked.""" + tool = _make_tool() + result = await tool.execute(action="set", key="_pending_queues", value={}) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_session_locks_blocked(self): + """_session_locks controls session isolation — must be blocked.""" + tool = _make_tool() + result = await tool.execute(action="set", key="_session_locks", value={}) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_active_tasks_blocked(self): + """_active_tasks tracks running tasks — must be blocked.""" + tool = _make_tool() + result = await tool.execute(action="set", key="_active_tasks", value={}) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_background_tasks_blocked(self): + """_background_tasks tracks background tasks — must be blocked.""" + tool = _make_tool() + result = await tool.execute(action="set", key="_background_tasks", value=[]) + assert "protected" in result + + @pytest.mark.asyncio + async def test_inspect_mcp_servers_blocked(self): + """_mcp_servers contains credentials — check must be blocked too.""" + tool = _make_tool() + result = await tool.execute(action="check", key="_mcp_servers") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_modify_wrapped_denied(self): + """__wrapped__ allows decorator bypass — must be denied.""" + tool = _make_tool() + result = await tool.execute(action="set", key="__wrapped__", value="evil") + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_closure_denied(self): + """__closure__ exposes function internals — must be denied.""" + tool = _make_tool() + result = await tool.execute(action="set", key="__closure__", value="evil") + assert "protected" in result + + +# --------------------------------------------------------------------------- +# validate_json_safe — element counting +# --------------------------------------------------------------------------- + +class TestValidateJsonSafe: + + def test_single_list_passes(self): + assert MyTool._validate_json_safe(list(range(500))) is None + + def test_deeply_nested_within_limit(self): + value = {"level1": {"level2": {"level3": list(range(100))}}} + assert MyTool._validate_json_safe(value) is None + + +# --------------------------------------------------------------------------- +# unknown action +# --------------------------------------------------------------------------- + +class TestUnknownAction: + + @pytest.mark.asyncio + async def test_unknown_action(self): + tool = _make_tool() + result = await tool.execute(action="explode") + assert "Unknown action" in result + + +# --------------------------------------------------------------------------- +# runtime_vars limits (from code review) +# --------------------------------------------------------------------------- + +class TestRuntimeVarsLimits: + + @pytest.mark.asyncio + async def test_runtime_vars_rejects_at_max_keys(self): + loop = _make_mock_loop() + loop._runtime_vars = {f"key_{i}": i for i in range(64)} + tool = _make_tool(loop) + result = await tool.execute(action="set", key="overflow", value="data") + assert "full" in result + assert "overflow" not in loop._runtime_vars + + @pytest.mark.asyncio + async def test_runtime_vars_allows_update_existing_key_at_max(self): + loop = _make_mock_loop() + loop._runtime_vars = {f"key_{i}": i for i in range(64)} + tool = _make_tool(loop) + result = await tool.execute(action="set", key="key_0", value="updated") + assert "Error" not in result + assert loop._runtime_vars["key_0"] == "updated" + + +# --------------------------------------------------------------------------- +# denied attrs (non-dunder) +# --------------------------------------------------------------------------- + +class TestDeniedAttrs: + + @pytest.mark.asyncio + async def test_modify_denied_non_dunder_blocked(self): + tool = _make_tool() + for attr in ("func_globals", "func_code"): + result = await tool.execute(action="set", key=attr, value="evil") + assert "protected" in result, f"{attr} should be blocked" + + +# --------------------------------------------------------------------------- +# SubagentStatus formatting +# --------------------------------------------------------------------------- + +class TestSubagentStatusFormatting: + + def test_format_single_status(self): + """_format_value should produce a rich multi-line display for a SubagentStatus.""" + from nanobot.agent.subagent import SubagentStatus + + status = SubagentStatus( + task_id="abc12345", + label="read logs and summarize", + task_description="Read the log files and produce a summary", + started_at=time.monotonic() - 12.4, + phase="awaiting_tools", + iteration=3, + tool_events=[ + {"name": "read_file", "status": "ok", "detail": "read app.log"}, + {"name": "grep", "status": "ok", "detail": "searched ERROR"}, + {"name": "exec", "status": "error", "detail": "timeout"}, + ], + usage={"prompt_tokens": 4500, "completion_tokens": 1200}, + ) + result = MyTool._format_value(status) + assert "abc12345" in result + assert "read logs and summarize" in result + assert "awaiting_tools" in result + assert "iteration: 3" in result + assert "read_file(ok)" in result + assert "exec(error)" in result + assert "4500" in result + + def test_format_status_dict(self): + """_format_value should handle dict[str, SubagentStatus] with rich display.""" + from nanobot.agent.subagent import SubagentStatus + + statuses = { + "abc12345": SubagentStatus( + task_id="abc12345", + label="task A", + task_description="Do task A", + started_at=time.monotonic() - 5.0, + phase="awaiting_tools", + iteration=1, + ), + } + result = MyTool._format_value(statuses) + assert "1 subagent(s)" in result + assert "abc12345" in result + assert "task A" in result + + def test_format_empty_status_dict(self): + """Empty dict[str, SubagentStatus] should show 'no running subagents'.""" + result = MyTool._format_value({}) + assert "{}" in result + + def test_format_status_with_error(self): + """Status with error should include the error message.""" + from nanobot.agent.subagent import SubagentStatus + + status = SubagentStatus( + task_id="err00001", + label="failing task", + task_description="A task that fails", + started_at=time.monotonic() - 1.0, + phase="error", + error="Connection refused", + ) + result = MyTool._format_value(status) + assert "error: Connection refused" in result + +# --------------------------------------------------------------------------- +# _SubagentHook after_iteration updates status +# --------------------------------------------------------------------------- + +class TestSubagentHookStatus: + + @pytest.mark.asyncio + async def test_after_iteration_updates_status(self): + """after_iteration should copy iteration, tool_events, usage to status.""" + from nanobot.agent.subagent import SubagentStatus, _SubagentHook + from nanobot.agent.hook import AgentHookContext + + status = SubagentStatus( + task_id="test", + label="test", + task_description="test", + started_at=time.monotonic(), + ) + hook = _SubagentHook("test", status) + + context = AgentHookContext( + iteration=5, + messages=[], + tool_events=[{"name": "read_file", "status": "ok", "detail": "ok"}], + usage={"prompt_tokens": 100, "completion_tokens": 50}, + ) + await hook.after_iteration(context) + + assert status.iteration == 5 + assert len(status.tool_events) == 1 + assert status.tool_events[0]["name"] == "read_file" + assert status.usage == {"prompt_tokens": 100, "completion_tokens": 50} + + @pytest.mark.asyncio + async def test_after_iteration_with_error(self): + """after_iteration should set status.error when context has an error.""" + from nanobot.agent.subagent import SubagentStatus, _SubagentHook + from nanobot.agent.hook import AgentHookContext + + status = SubagentStatus( + task_id="test", + label="test", + task_description="test", + started_at=time.monotonic(), + ) + hook = _SubagentHook("test", status) + + context = AgentHookContext( + iteration=1, + messages=[], + error="something went wrong", + ) + await hook.after_iteration(context) + + assert status.error == "something went wrong" + + @pytest.mark.asyncio + async def test_after_iteration_no_status_is_noop(self): + """after_iteration with no status should be a no-op.""" + from nanobot.agent.subagent import _SubagentHook + from nanobot.agent.hook import AgentHookContext + + hook = _SubagentHook("test") + context = AgentHookContext(iteration=1, messages=[]) + await hook.after_iteration(context) # should not raise + + +# --------------------------------------------------------------------------- +# Checkpoint callback updates status +# --------------------------------------------------------------------------- + +class TestCheckpointCallback: + + @pytest.mark.asyncio + async def test_checkpoint_updates_phase_and_iteration(self): + """The _on_checkpoint callback should update status.phase and iteration.""" + from nanobot.agent.subagent import SubagentStatus + import asyncio + + status = SubagentStatus( + task_id="cp", + label="test", + task_description="test", + started_at=time.monotonic(), + ) + + # Simulate the checkpoint callback as defined in _run_subagent + async def _on_checkpoint(payload: dict) -> None: + status.phase = payload.get("phase", status.phase) + status.iteration = payload.get("iteration", status.iteration) + + await _on_checkpoint({"phase": "awaiting_tools", "iteration": 2}) + assert status.phase == "awaiting_tools" + assert status.iteration == 2 + + await _on_checkpoint({"phase": "tools_completed", "iteration": 3}) + assert status.phase == "tools_completed" + assert status.iteration == 3 + + @pytest.mark.asyncio + async def test_checkpoint_preserves_phase_on_missing_key(self): + """If payload doesn't have 'phase', status.phase should stay unchanged.""" + from nanobot.agent.subagent import SubagentStatus + + status = SubagentStatus( + task_id="cp", + label="test", + task_description="test", + started_at=time.monotonic(), + phase="initializing", + ) + + async def _on_checkpoint(payload: dict) -> None: + status.phase = payload.get("phase", status.phase) + status.iteration = payload.get("iteration", status.iteration) + + await _on_checkpoint({"iteration": 1}) + assert status.phase == "initializing" + assert status.iteration == 1 + + +# --------------------------------------------------------------------------- +# check subagents._task_statuses via dot-path +# NOTE: subagents is now BLOCKED for security, so these tests verify +# that access is properly rejected. +# --------------------------------------------------------------------------- + +class TestInspectTaskStatuses: + + @pytest.mark.asyncio + async def test_inspect_task_statuses_accessible(self): + """subagents is READ_ONLY — check should show subagent statuses.""" + from nanobot.agent.subagent import SubagentStatus + + loop = _make_mock_loop() + loop.subagents._task_statuses = { + "abc12345": SubagentStatus( + task_id="abc12345", + label="read logs", + task_description="Read the log files", + started_at=time.monotonic() - 8.0, + phase="awaiting_tools", + iteration=2, + tool_events=[{"name": "read_file", "status": "ok", "detail": "ok"}], + usage={"prompt_tokens": 500, "completion_tokens": 100}, + ), + } + tool = _make_tool(loop) + result = await tool.execute(action="check", key="subagents._task_statuses") + assert "abc12345" in result + assert "read logs" in result + + @pytest.mark.asyncio + async def test_inspect_single_subagent_status_accessible(self): + """subagents._task_statuses. should return individual SubagentStatus.""" + from nanobot.agent.subagent import SubagentStatus + + loop = _make_mock_loop() + status = SubagentStatus( + task_id="xyz", + label="search code", + task_description="Search the codebase", + started_at=time.monotonic() - 3.0, + phase="done", + iteration=4, + stop_reason="completed", + ) + loop.subagents._task_statuses = {"xyz": status} + tool = _make_tool(loop) + result = await tool.execute(action="check", key="subagents._task_statuses.xyz") + assert "search code" in result + assert "completed" in result + + +# --------------------------------------------------------------------------- +# read-only mode (my_set=False) +# --------------------------------------------------------------------------- + +class TestReadOnlyMode: + + def _make_readonly_tool(self): + loop = _make_mock_loop() + return MyTool(loop=loop, modify_allowed=False) + + @pytest.mark.asyncio + async def test_inspect_allowed_in_readonly(self): + tool = self._make_readonly_tool() + result = await tool.execute(action="check", key="max_iterations") + assert "40" in result + + @pytest.mark.asyncio + async def test_modify_blocked_in_readonly(self): + tool = self._make_readonly_tool() + result = await tool.execute(action="set", key="max_iterations", value=80) + assert "disabled" in result + + def test_description_shows_readonly(self): + tool = self._make_readonly_tool() + assert "READ-ONLY MODE" in tool.description + + def test_description_shows_warning_when_modify_allowed(self): + tool = _make_tool() + assert "IMPORTANT" in tool.description + assert "READ-ONLY" not in tool.description + + +# --------------------------------------------------------------------------- +# runtime vars check fallback (Fix #1: cross-turn memory) +# --------------------------------------------------------------------------- + +class TestRuntimeVarsInspectFallback: + + @pytest.mark.asyncio + async def test_inspect_runtime_var_after_modify(self): + """Design doc scenario: set then check should return the value.""" + tool = _make_tool() + await tool.execute(action="set", key="user_prefers_concise", value=True) + result = await tool.execute(action="check", key="user_prefers_concise") + assert "True" in result + + @pytest.mark.asyncio + async def test_inspect_runtime_var_string(self): + tool = _make_tool() + await tool.execute(action="set", key="current_project", value="nanobot") + result = await tool.execute(action="check", key="current_project") + assert "nanobot" in result + + @pytest.mark.asyncio + async def test_inspect_runtime_var_dict(self): + tool = _make_tool() + await tool.execute(action="set", key="task_meta", value={"step": 2, "total": 5}) + result = await tool.execute(action="check", key="task_meta") + assert "step" in result + assert "2" in result + + @pytest.mark.asyncio + async def test_inspect_nonexistent_still_returns_not_found(self): + tool = _make_tool() + result = await tool.execute(action="check", key="never_set_key_xyz") + assert "not found" in result + + +# --------------------------------------------------------------------------- +# sensitive sub-field blocking (Fix #3: API key leak prevention) +# --------------------------------------------------------------------------- + +class TestSensitiveSubFieldBlocking: + + @pytest.mark.asyncio + async def test_inspect_api_key_blocked(self): + """web_config.search.api_key must not be accessible.""" + tool = _make_tool() + result = await tool.execute(action="check", key="web_config.search.api_key") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_inspect_password_blocked(self): + """Any field named 'password' must be blocked.""" + loop = _make_mock_loop() + loop.some_config = MagicMock() + loop.some_config.password = "hunter2" + tool = _make_tool(loop) + result = await tool.execute(action="check", key="some_config.password") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_inspect_secret_blocked(self): + loop = _make_mock_loop() + loop.vault = MagicMock() + loop.vault.secret = "classified" + tool = _make_tool(loop) + result = await tool.execute(action="check", key="vault.secret") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_inspect_token_blocked(self): + loop = _make_mock_loop() + loop.auth_data = MagicMock() + loop.auth_data.token = "jwt-payload" + tool = _make_tool(loop) + result = await tool.execute(action="check", key="auth_data.token") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_modify_api_key_blocked(self): + """web_config is READ_ONLY, so any set under it is blocked.""" + tool = _make_tool() + result = await tool.execute(action="set", key="web_config.search.api_key", value="evil") + # Blocked either by READ_ONLY (web_config) or sensitive name (api_key) + assert "read-only" in result or "not accessible" in result + + @pytest.mark.asyncio + async def test_modify_password_blocked(self): + loop = _make_mock_loop() + loop.some_config = MagicMock() + tool = _make_tool(loop) + result = await tool.execute(action="set", key="some_config.password", value="evil") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_inspect_non_sensitive_subfield_allowed(self): + """web_config.enable should still be inspectable.""" + tool = _make_tool() + result = await tool.execute(action="check", key="web_config.enable") + assert "True" in result + + @pytest.mark.asyncio + async def test_modify_sensitive_top_level_blocked(self): + """Top-level key matching sensitive name must be blocked for set.""" + tool = _make_tool() + result = await tool.execute(action="set", key="api_key", value="evil") + assert "protected" in result + + +# --------------------------------------------------------------------------- +# security-sensitive attribute protection (Fix #4) +# --------------------------------------------------------------------------- + +class TestSecurityAttributeProtection: + + @pytest.mark.asyncio + async def test_modify_restrict_to_workspace_blocked(self): + """restrict_to_workspace is BLOCKED — cannot be toggled.""" + tool = _make_tool() + result = await tool.execute(action="set", key="restrict_to_workspace", value=True) + assert "protected" in result + + @pytest.mark.asyncio + async def test_modify_exec_config_blocked(self): + """exec_config is READ_ONLY — cannot be modified.""" + tool = _make_tool() + result = await tool.execute(action="set", key="exec_config", value=MagicMock()) + assert "read-only" in result + + @pytest.mark.asyncio + async def test_modify_web_config_blocked(self): + """web_config is READ_ONLY — cannot be modified.""" + tool = _make_tool() + result = await tool.execute(action="set", key="web_config", value=MagicMock()) + assert "read-only" in result + + @pytest.mark.asyncio + async def test_modify_channels_config_blocked(self): + """channels_config is BLOCKED — cannot be modified.""" + tool = _make_tool() + result = await tool.execute(action="set", key="channels_config", value={}) + assert "protected" in result + + @pytest.mark.asyncio + async def test_inspect_restrict_to_workspace_blocked(self): + """restrict_to_workspace is BLOCKED — cannot be inspected.""" + tool = _make_tool() + result = await tool.execute(action="check", key="restrict_to_workspace") + assert "not accessible" in result + + @pytest.mark.asyncio + async def test_inspect_exec_config_allowed(self): + """exec_config is READ_ONLY — check should work.""" + tool = _make_tool() + result = await tool.execute(action="check", key="exec_config") + assert "Error" not in result + + @pytest.mark.asyncio + async def test_inspect_web_config_allowed(self): + """web_config is READ_ONLY — check should work.""" + tool = _make_tool() + result = await tool.execute(action="check", key="web_config") + assert "Error" not in result + + @pytest.mark.asyncio + async def test_modify_exec_config_dotpath_blocked(self): + """exec_config.enable = False should be blocked because exec_config is READ_ONLY.""" + tool = _make_tool() + result = await tool.execute(action="set", key="exec_config.enable", value=False) + assert "read-only" in result + + @pytest.mark.asyncio + async def test_modify_web_config_dotpath_blocked(self): + """web_config.enable = False should be blocked because web_config is READ_ONLY.""" + tool = _make_tool() + result = await tool.execute(action="set", key="web_config.enable", value=False) + assert "read-only" in result + + +# --------------------------------------------------------------------------- +# current iteration count (Fix #2) +# --------------------------------------------------------------------------- + +class TestCurrentIteration: + + @pytest.mark.asyncio + async def test_inspect_current_iteration(self): + tool = _make_tool() + result = await tool.execute(action="check", key="_current_iteration") + assert "0" in result + + @pytest.mark.asyncio + async def test_current_iteration_in_summary(self): + tool = _make_tool() + result = await tool.execute(action="check") + assert "_current_iteration" in result + + @pytest.mark.asyncio + async def test_modify_current_iteration_blocked(self): + """_current_iteration is READ_ONLY — cannot be set manually.""" + tool = _make_tool() + result = await tool.execute(action="set", key="_current_iteration", value=5) + assert "read-only" in result + + +# --------------------------------------------------------------------------- +# _last_usage in check summary (Fix #5) +# --------------------------------------------------------------------------- + +class TestLastUsageInSummary: + + @pytest.mark.asyncio + async def test_last_usage_shown_in_summary(self): + tool = _make_tool() + result = await tool.execute(action="check") + assert "_last_usage" in result + assert "prompt_tokens" in result + + @pytest.mark.asyncio + async def test_last_usage_not_shown_when_empty(self): + loop = _make_mock_loop() + loop._last_usage = {} + tool = _make_tool(loop) + result = await tool.execute(action="check") + assert "_last_usage" not in result + + +# --------------------------------------------------------------------------- +# set_context (audit session tracking) +# --------------------------------------------------------------------------- + +class TestSetContext: + + def test_set_context_stores_channel_and_chat_id(self): + tool = _make_tool() + tool.set_context("feishu", "oc_abc123") + assert tool._channel == "feishu" + assert tool._chat_id == "oc_abc123" diff --git a/tests/tools/test_search_tools.py b/tests/tools/test_search_tools.py index 3153caa45..ee7f61c06 100644 --- a/tests/tools/test_search_tools.py +++ b/tests/tools/test_search_tools.py @@ -3,6 +3,7 @@ from __future__ import annotations import os +import time from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock @@ -10,7 +11,7 @@ from unittest.mock import AsyncMock, MagicMock import pytest from nanobot.agent.loop import AgentLoop -from nanobot.agent.subagent import SubagentManager +from nanobot.agent.subagent import SubagentManager, SubagentStatus from nanobot.agent.tools.search import GlobTool, GrepTool from nanobot.bus.queue import MessageBus @@ -319,7 +320,8 @@ async def test_subagent_registers_grep_and_glob(tmp_path: Path) -> None: mgr.runner.run = fake_run mgr._announce_result = AsyncMock() - await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"}) + status = SubagentStatus(task_id="sub-1", label="label", task_description="search task", started_at=time.monotonic()) + await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"}, status) assert "grep" in captured["tool_names"] assert "glob" in captured["tool_names"]