diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index 50b86de43..7a34cfbb7 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -33,6 +33,7 @@ from nanobot.utils.runtime import ( ensure_nonempty_tool_result, is_blank_text, repeated_external_lookup_error, + repeated_workspace_violation_error, ) _DEFAULT_ERROR_MESSAGE = "Sorry, I encountered an error calling the AI model." @@ -239,6 +240,10 @@ class AgentRunner: stop_reason = "completed" tool_events: list[dict[str, str]] = [] external_lookup_counts: dict[str, int] = {} + # Tracks repeated bypass attempts against the same outside-workspace + # target within this turn. See ``repeated_workspace_violation_error`` + # in ``nanobot.utils.runtime`` for the throttle policy. + workspace_violation_counts: dict[str, int] = {} empty_content_retries = 0 length_recovery_count = 0 had_injections = False @@ -314,6 +319,7 @@ class AgentRunner: spec, tool_calls, external_lookup_counts, + workspace_violation_counts, ) tool_events.extend(new_events) context.tool_results = list(results) @@ -698,20 +704,25 @@ class AgentRunner: spec: AgentRunSpec, tool_calls: list[ToolCallRequest], external_lookup_counts: dict[str, int], + workspace_violation_counts: dict[str, int], ) -> tuple[list[Any], list[dict[str, str]], BaseException | None]: batches = self._partition_tool_batches(spec, tool_calls) tool_results: list[tuple[Any, dict[str, str], BaseException | None]] = [] for batch in batches: if spec.concurrent_tools and len(batch) > 1: batch_results = await asyncio.gather(*( - self._run_tool(spec, tool_call, external_lookup_counts) + self._run_tool( + spec, tool_call, external_lookup_counts, workspace_violation_counts, + ) for tool_call in batch )) tool_results.extend(batch_results) else: batch_results = [] for tool_call in batch: - result = await self._run_tool(spec, tool_call, external_lookup_counts) + result = await self._run_tool( + spec, tool_call, external_lookup_counts, workspace_violation_counts, + ) tool_results.append(result) batch_results.append(result) if isinstance(result[2], AskUserInterrupt): @@ -734,6 +745,7 @@ class AgentRunner: spec: AgentRunSpec, tool_call: ToolCallRequest, external_lookup_counts: dict[str, int], + workspace_violation_counts: dict[str, int], ) -> tuple[Any, dict[str, str], BaseException | None]: hint = "\n\n[Analyze the error above and try a different approach.]" lookup_error = repeated_external_lookup_error( @@ -763,16 +775,20 @@ class AgentRunner: "status": "error", "detail": prep_error.split(": ", 1)[-1][:120], } - if self._is_workspace_violation(prep_error): - logger.warning( - "Tool {} blocked by workspace/safety guard during preparation; aborting turn: {}", - tool_call.name, - prep_error.replace("\n", " ").strip()[:200], - ) - event["detail"] = ("workspace_violation: " - + prep_error.replace("\n", " ").strip())[:160] - return prep_error, event, RuntimeError(prep_error) - return prep_error + hint, event, RuntimeError(prep_error) if spec.fail_on_tool_error else None + handled = self._classify_violation( + raw_text=prep_error, + soft_payload=prep_error + hint, + ssrf_payload=prep_error, + ssrf_error=RuntimeError(prep_error), + event=event, + tool_call=tool_call, + workspace_violation_counts=workspace_violation_counts, + ) + if handled is not None: + return handled + return prep_error + hint, event, ( + RuntimeError(prep_error) if spec.fail_on_tool_error else None + ) try: if tool is not None: result = await tool.execute(**params) @@ -789,18 +805,25 @@ class AgentRunner: if isinstance(exc, AskUserInterrupt): event["status"] = "waiting" return "", event, exc - if self._is_workspace_violation(str(exc)): - logger.warning( - "Tool {} blocked by workspace/safety guard; aborting turn: {}", - tool_call.name, - str(exc).replace("\n", " ").strip()[:200], - ) - event["detail"] = ("workspace_violation: " - + str(exc).replace("\n", " ").strip())[:160] - return f"Error: {type(exc).__name__}: {exc}", event, exc + payload = f"Error: {type(exc).__name__}: {exc}" + handled = self._classify_violation( + raw_text=str(exc), + # Match the legacy behavior here: the exception branch never + # appended the "try a different approach" hint, even on the + # workspace-violation path -- preserve that for callers that + # eyeball the exact tool message. + soft_payload=payload, + ssrf_payload=payload, + ssrf_error=exc, + event=event, + tool_call=tool_call, + workspace_violation_counts=workspace_violation_counts, + ) + if handled is not None: + return handled if spec.fail_on_tool_error: - return f"Error: {type(exc).__name__}: {exc}", event, exc - return f"Error: {type(exc).__name__}: {exc}", event, None + return payload, event, exc + return payload, event, None if isinstance(result, str) and result.startswith("Error"): event = { @@ -808,17 +831,17 @@ class AgentRunner: "status": "error", "detail": result.replace("\n", " ").strip()[:120], } - - # check the outside workspace error and break loop - if self._is_workspace_violation(result): - logger.warning( - "Tool {} blocked by workspace/safety guard; aborting turn: {}", - tool_call.name, - result.replace("\n", " ").strip()[:200], - ) - event["detail"] = ("workspace_violation: " - + result.replace("\n", " ").strip())[:160] - return result, event, RuntimeError(result) + handled = self._classify_violation( + raw_text=result, + soft_payload=result + hint, + ssrf_payload=result, + ssrf_error=RuntimeError(result), + event=event, + tool_call=tool_call, + workspace_violation_counts=workspace_violation_counts, + ) + if handled is not None: + return handled if spec.fail_on_tool_error: return result + hint, event, RuntimeError(result) return result + hint, event, None @@ -831,39 +854,103 @@ class AgentRunner: detail = detail[:120] + "..." return result, {"name": tool_call.name, "status": "ok", "detail": detail}, None - # Markers identifying tool results that represent a *hard* workspace / - # safety boundary rejection -- only these abort the agent loop. - # - # We deliberately keep this list narrow (#3599 / #3605): - # - The first four come from explicit path-resolution checks in - # ``filesystem.py`` and ``shell.py`` that cannot false-positive on user - # payloads -- if you see them, the LLM truly tried to escape the - # workspace. - # - "internal/private url detected" stays fatal because SSRF is a real - # security boundary; allowing the LLM to "retry" would just let it - # poke internal infra with a different URL phrasing. - # - "path traversal detected" and "path outside working dir" are - # intentionally *not* listed: both come from the heuristic - # ``_guard_command`` checks in ``shell.py`` which scan the raw command - # string and routinely false-positive on legitimate constructs (e.g. - # ``2>/dev/null`` redirects, quoted ``..`` arguments to ``sed`` / - # ``find``, paths inside inline scripts). Treating them as fatal - # silently kills user turns (#3599) and prevents the agent from - # self-correcting by trying a different approach (#3605). - _WORKSPACE_BLOCK_MARKERS: tuple[str, ...] = ( + # SSRF rejections remain a hard, non-recoverable safety boundary: a single + # successful internal-URL fetch can leak cloud metadata, so we never let + # the LLM "retry" with a different phrasing of the same target. + _SSRF_MARKER: str = "internal/private url detected" + + # Markers that identify "tried to access something outside the workspace". + # Unlike SSRF these are intentionally *non-fatal* (#3599 / #3605): + # - The structured error message itself tells the model not to bypass. + # - ``repeated_workspace_violation_error`` throttles the loop reported + # in #3493 by escalating after two attempts against the same target. + # - max_iterations is the ultimate ceiling, so we never need to abort. + _WORKSPACE_VIOLATION_MARKERS: tuple[str, ...] = ( "outside the configured workspace", "outside allowed directory", "working_dir is outside", "working_dir could not be resolved", - "internal/private url detected", + "path outside working dir", + "path traversal detected", ) + @classmethod + def _is_ssrf_violation(cls, text: str) -> bool: + return bool(text) and cls._SSRF_MARKER in text.lower() + @classmethod def _is_workspace_violation(cls, text: str) -> bool: + """True when *text* looks like *any* policy boundary rejection. + + Kept as a public-ish hook for callers that need a yes/no signal + (logging, telemetry). The runner itself uses the more specific + ``_is_ssrf_violation`` to decide what is fatal. + """ if not text: return False lowered = text.lower() - return any(marker in lowered for marker in cls._WORKSPACE_BLOCK_MARKERS) + if cls._SSRF_MARKER in lowered: + return True + return any(marker in lowered for marker in cls._WORKSPACE_VIOLATION_MARKERS) + + def _classify_violation( + self, + *, + raw_text: str, + soft_payload: str, + ssrf_payload: str, + ssrf_error: BaseException, + event: dict[str, str], + tool_call: ToolCallRequest, + workspace_violation_counts: dict[str, int], + ) -> tuple[Any, dict[str, str], BaseException | None] | None: + """Apply violation policy to a tool failure, or pass through. + + Returns a fully-formed (payload, event, error) triple when *raw_text* + looks like a policy boundary rejection. Returns ``None`` when the + caller should fall through to its generic per-branch handling. + + - SSRF stays fatal -- a single successful internal fetch can leak + cloud metadata, so retrying with a different URL phrasing is + never acceptable. We mutate ``event`` in place so the caller's + telemetry stays consistent. + - All other workspace-bound rejections become soft tool errors. + Each repeated attempt against the same outside target bumps a + per-turn counter; after the soft retry budget is exhausted we + replace the message body with an explicit "stop trying to bypass + the policy" instruction (see #3493 for the original bypass-loop + that motivated PR #3493's hard-abort, and #3599 / #3605 for why + the hard-abort backfired). + """ + if self._is_ssrf_violation(raw_text): + logger.warning( + "Tool {} blocked by SSRF guard; aborting turn: {}", + tool_call.name, + raw_text.replace("\n", " ").strip()[:200], + ) + event["detail"] = ("workspace_violation: " + + raw_text.replace("\n", " ").strip())[:160] + return ssrf_payload, event, ssrf_error + + if self._is_workspace_violation(raw_text): + escalation = repeated_workspace_violation_error( + tool_call.name, + tool_call.arguments, + workspace_violation_counts, + ) + event["detail"] = ("workspace_violation: " + + raw_text.replace("\n", " ").strip())[:160] + if escalation is not None: + logger.warning( + "Tool {} hit workspace boundary repeatedly; escalating hint", + tool_call.name, + ) + event["detail"] = ("workspace_violation_escalated: " + + raw_text.replace("\n", " ").strip())[:160] + return escalation, event, None + return soft_payload, event, None + + return None async def _emit_checkpoint( self, diff --git a/nanobot/agent/tools/filesystem.py b/nanobot/agent/tools/filesystem.py index 587a149f2..8091e7670 100644 --- a/nanobot/agent/tools/filesystem.py +++ b/nanobot/agent/tools/filesystem.py @@ -14,6 +14,13 @@ from nanobot.utils.helpers import build_image_content_blocks, detect_image_mime from nanobot.config.paths import get_media_dir +_FS_WORKSPACE_BOUNDARY_NOTE = ( + " (this is a hard policy boundary, not a transient failure; " + "do not retry with shell tricks or alternative tools, and ask " + "the user how to proceed if the resource is genuinely required)" +) + + def _resolve_path( path: str, workspace: Path | None = None, @@ -29,7 +36,10 @@ def _resolve_path( media_path = get_media_dir().resolve() all_dirs = [allowed_dir] + [media_path] + (extra_allowed_dirs or []) if not any(_is_under(resolved, d) for d in all_dirs): - raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") + raise PermissionError( + f"Path {path} is outside allowed directory {allowed_dir}" + + _FS_WORKSPACE_BOUNDARY_NOTE + ) return resolved diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index 177176d25..2ed6b981e 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -19,6 +19,20 @@ from nanobot.config.paths import get_media_dir _IS_WINDOWS = sys.platform == "win32" +# Appended to every workspace / safety guard rejection so the LLM is told +# explicitly that this is a policy boundary (not a transient failure) and +# that bypass loops will not change the answer. The throttle in +# ``nanobot.utils.runtime.repeated_workspace_violation_error`` upgrades +# this further once the model keeps targeting the same path. +_WORKSPACE_BOUNDARY_NOTE = ( + "\n\nNote: this is a hard policy boundary, not a transient failure. " + "Do NOT retry with shell tricks (symlinks, base64 piping, alternative " + "tools, working_dir overrides). If the user genuinely needs this " + "resource, tell them you cannot reach it under the current " + "restrict_to_workspace policy and ask how to proceed." +) + + @tool_parameters( tool_parameters_schema( command=StringSchema("The shell command to execute"), @@ -129,9 +143,15 @@ class ExecTool(Tool): requested = Path(cwd).expanduser().resolve() workspace_root = Path(self.working_dir).expanduser().resolve() except Exception: - return "Error: working_dir could not be resolved" + return ( + "Error: working_dir could not be resolved" + + _WORKSPACE_BOUNDARY_NOTE + ) if requested != workspace_root and workspace_root not in requested.parents: - return "Error: working_dir is outside the configured workspace" + return ( + "Error: working_dir is outside the configured workspace" + + _WORKSPACE_BOUNDARY_NOTE + ) guard_error = self._guard_command(command, cwd) if guard_error: @@ -305,11 +325,18 @@ class ExecTool(Tool): from nanobot.security.network import contains_internal_url if contains_internal_url(cmd): + # SSRF: stay short and direct. The runner classifies this + # marker as a hard, non-recoverable boundary, so the + # _WORKSPACE_BOUNDARY_NOTE policy text doesn't apply here -- + # we don't want the model to retry at all. return "Error: Command blocked by safety guard (internal/private URL detected)" if self.restrict_to_workspace: if "..\\" in cmd or "../" in cmd: - return "Error: Command blocked by safety guard (path traversal detected)" + return ( + "Error: Command blocked by safety guard (path traversal detected)" + + _WORKSPACE_BOUNDARY_NOTE + ) cwd_path = Path(cwd).resolve() @@ -335,7 +362,10 @@ class ExecTool(Tool): and media_path not in p.parents and p != media_path ): - return "Error: Command blocked by safety guard (path outside working dir)" + return ( + "Error: Command blocked by safety guard (path outside working dir)" + + _WORKSPACE_BOUNDARY_NOTE + ) return None diff --git a/nanobot/utils/runtime.py b/nanobot/utils/runtime.py index 39822fd48..f5c5f994a 100644 --- a/nanobot/utils/runtime.py +++ b/nanobot/utils/runtime.py @@ -2,6 +2,8 @@ from __future__ import annotations +import re +from pathlib import Path from typing import Any from loguru import logger @@ -10,6 +12,14 @@ from nanobot.utils.helpers import stringify_text_blocks _MAX_REPEAT_EXTERNAL_LOOKUPS = 2 +# Workspace-violation throttle: how many times the LLM is allowed to bump +# against the same outside-workspace target *within a single turn* before the +# tool result is escalated with a hard "stop trying to bypass the policy" +# instruction. Two free attempts give the model room to e.g. read_file then +# fall back to exec without immediately escalating; the third attempt at the +# same target is treated as a clear bypass loop. +_MAX_REPEAT_WORKSPACE_VIOLATIONS = 2 + EMPTY_FINAL_RESPONSE_MESSAGE = ( "I completed the tool steps but couldn't produce a final answer. " "Please try again or narrow the task." @@ -95,3 +105,108 @@ def repeated_external_lookup_error( "Error: repeated external lookup blocked. " "Use the results you already have to answer, or try a meaningfully different source." ) + + +# --- Workspace-violation throttle -------------------------------------------- +# +# When ``restrict_to_workspace`` is on and the LLM tries to read or exec +# something outside of the workspace, we want to *tell* the model that it +# hit a hard policy boundary -- not silently abort the whole turn and not +# allow it to spin forever swapping ``read_file`` for ``exec cat`` for +# ``python -c open(...)`` (the actual loop reported in #3493). The strategy +# is two-fold: +# +# 1. Each individual guard error already includes structured instructions +# that tell the model "don't try to bypass this; ask the user for help". +# 2. We additionally count how many times the *same outside target* has +# been refused within the current turn. After two free attempts the +# third refusal swaps in a much more forceful message that quotes the +# target path and explicitly orders the model to stop and surface the +# boundary back to the user. The model is still free to do something +# else (different target, different question) -- only the bypass loop +# is interrupted. +# +# This intentionally does *not* fatal-abort the turn: max_iterations and +# the empty-final-response retries already provide the ultimate ceiling +# for runaway loops, and aborting is what produced the silent-hang bug +# in #3605 in the first place. + +_OUTSIDE_PATH_PATTERN = re.compile(r"(?:^|[\s|>'\"])((?:/[^\s\"'>;|<]+)|(?:~[^\s\"'>;|<]+))") + + +def workspace_violation_signature( + tool_name: str, + arguments: dict[str, Any], +) -> str | None: + """Return a stable signature for the outside-workspace target a tool tried. + + The signature is shared across tool names so that the LLM cannot bypass + the throttle by switching from ``read_file`` to ``exec cat`` to + ``python -c open(...)`` against the same path. Returns ``None`` when + the call has no obvious outside target (e.g. SSRF rejections, deny + pattern hits, or tools whose argument shape we don't understand). + """ + for key in ("path", "file_path", "target", "source", "destination"): + val = arguments.get(key) + if isinstance(val, str) and val.strip(): + return _normalize_violation_target(val.strip()) + + if tool_name in {"exec", "shell"}: + cmd = str(arguments.get("command") or "").strip() + if cmd: + match = _OUTSIDE_PATH_PATTERN.search(cmd) + if match: + return _normalize_violation_target(match.group(1)) + cwd = str(arguments.get("working_dir") or "").strip() + if cwd: + return _normalize_violation_target(cwd) + + return None + + +def _normalize_violation_target(raw: str) -> str: + """Normalize *raw* path so that equivalent spellings collide on the same key.""" + try: + normalized = str(Path(raw).expanduser().resolve()) + except Exception: + normalized = raw + return f"violation:{normalized}".lower() + + +def repeated_workspace_violation_error( + tool_name: str, + arguments: dict[str, Any], + seen_counts: dict[str, int], +) -> str | None: + """Return an escalated error string after repeated bypass attempts. + + Returns ``None`` while the LLM is still within the soft retry budget -- + callers should fall back to the tool's own error message in that case. + Once the budget is exceeded, returns a hard "stop trying" instruction + that quotes the offending target. Throttle state lives in + *seen_counts* (a per-turn dict), so the budget naturally resets across + turns without persisting LLM-controlled keys. + """ + signature = workspace_violation_signature(tool_name, arguments) + if signature is None: + return None + count = seen_counts.get(signature, 0) + 1 + seen_counts[signature] = count + if count <= _MAX_REPEAT_WORKSPACE_VIOLATIONS: + return None + logger.warning( + "Escalating repeated workspace bypass attempt {} (attempt {})", + signature[:160], + count, + ) + target = signature.split("violation:", 1)[1] if "violation:" in signature else signature + return ( + "Error: refusing repeated workspace-bypass attempts.\n" + f"You have tried to access '{target}' (or an equivalent path) " + f"{count} times in this turn. This is a hard policy boundary -- " + "switching tools, shell tricks, working_dir overrides, symlinks, " + "or base64 piping will NOT change the answer. Stop retrying. " + "If the user genuinely needs this resource, tell them you cannot " + "access it and ask how they want to proceed (e.g. copy the file " + "into the workspace, or disable restrict_to_workspace for this run)." + ) diff --git a/tests/agent/test_runner.py b/tests/agent/test_runner.py index 86bb8f1bf..09d1dbfd5 100644 --- a/tests/agent/test_runner.py +++ b/tests/agent/test_runner.py @@ -313,21 +313,33 @@ async def test_runner_returns_structured_tool_error(): @pytest.mark.asyncio -async def test_runner_stops_on_workspace_violation_without_fail_on_tool_error(): +async def test_runner_does_not_abort_on_workspace_violation_anymore(): + """v2 behavior: workspace-bound rejections are *soft* tool errors. + + Previously (PR #3493) any workspace boundary error became a fatal + RuntimeError that aborted the turn. That silently killed legitimate + workspace commands once the heuristic guard misfired (#3599 #3605), so + we now hand the error back to the LLM as a recoverable tool result and + rely on ``repeated_workspace_violation_error`` to throttle bypass loops. + """ from nanobot.agent.runner import AgentRunSpec, AgentRunner provider = MagicMock() provider.chat_with_retry = AsyncMock(side_effect=[ LLMResponse( - content="working", - tool_calls=[ToolCallRequest(id="call_1", name="read_file", arguments={"path": "/tmp/outside.md"})], + content="trying outside", + tool_calls=[ToolCallRequest( + id="call_1", name="read_file", arguments={"path": "/tmp/outside.md"}, + )], ), - LLMResponse(content="should not continue", tool_calls=[]), + LLMResponse(content="ok, telling the user instead", tool_calls=[]), ]) tools = MagicMock() tools.get_definitions.return_value = [] tools.execute = AsyncMock( - side_effect=PermissionError("Path /tmp/outside.md is outside allowed directory /workspace") + side_effect=PermissionError( + "Path /tmp/outside.md is outside allowed directory /workspace" + ) ) runner = AgentRunner(provider) @@ -336,71 +348,92 @@ async def test_runner_stops_on_workspace_violation_without_fail_on_tool_error(): initial_messages=[], tools=tools, model="test-model", - max_iterations=2, + max_iterations=3, max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, )) - assert provider.chat_with_retry.await_count == 1 - assert result.stop_reason == "tool_error" - assert "outside allowed directory" in (result.error or "") - assert result.tool_events == [ - { - "name": "read_file", - "status": "error", - "detail": "workspace_violation: Path /tmp/outside.md is outside allowed directory /workspace", - } - ] + assert provider.chat_with_retry.await_count == 2, ( + "workspace violation must NOT short-circuit the loop" + ) + assert result.stop_reason != "tool_error" + assert result.error is None + assert result.final_content == "ok, telling the user instead" + assert result.tool_events and result.tool_events[0]["status"] == "error" + # Detail still carries the workspace_violation breadcrumb for telemetry, + # but the runner did not raise. + assert "workspace_violation" in result.tool_events[0]["detail"] -def test_is_workspace_violation_recognizes_ssrf_block(): - """Internal/private URL block must be classified as a fatal workspace violation. +def test_is_ssrf_violation_remains_fatal(): + """SSRF rejections are the only marker that stays turn-fatal. - Regression guard: the deny/allowlist filter messages were intentionally split - out of `_WORKSPACE_BLOCK_MARKERS` so the LLM can retry, but SSRF rejections - are a hard security boundary and must remain fatal. + A single successful internal-URL fetch can leak cloud metadata, so we + never let the LLM "retry" with a different URL phrasing -- contrast + this with workspace-bound rejections which are soft + throttled in v2. """ from nanobot.agent.runner import AgentRunner ssrf_msg = "Error: Command blocked by safety guard (internal/private URL detected)" - assert AgentRunner._is_workspace_violation(ssrf_msg) is True + assert AgentRunner._is_ssrf_violation(ssrf_msg) is True - # Sanity: deny/allowlist filter messages are deliberately *not* fatal. - assert AgentRunner._is_workspace_violation( - "Error: Command blocked by deny pattern filter" - ) is False - assert AgentRunner._is_workspace_violation( - "Error: Command blocked by allowlist filter (not in allowlist)" - ) is False - - -def test_is_workspace_violation_does_not_fatal_on_shell_guard_heuristics(): - """#3599 / #3605 regression: shell guard heuristics must NOT be fatal. - - ``path outside working dir`` and ``path traversal detected`` are produced - by best-effort string scans inside ``ExecTool._guard_command`` -- they - routinely false-positive on idiomatic constructs (``2>/dev/null``, - ``sed 's|x|../y|g'``) and should be surfaced to the LLM as recoverable - tool errors so it can switch tactics, not abort the whole turn. - """ - from nanobot.agent.runner import AgentRunner - - assert AgentRunner._is_workspace_violation( + # Workspace-bound markers are NOT classified as SSRF. + assert AgentRunner._is_ssrf_violation( "Error: Command blocked by safety guard (path outside working dir)" ) is False - assert AgentRunner._is_workspace_violation( - "Error: Command blocked by safety guard (path traversal detected)" + assert AgentRunner._is_ssrf_violation( + "Path /tmp/x is outside allowed directory /ws" + ) is False + # Deny / allowlist filter messages stay non-fatal too. + assert AgentRunner._is_ssrf_violation( + "Error: Command blocked by deny pattern filter" ) is False @pytest.mark.asyncio -async def test_runner_lets_llm_recover_from_shell_guard_path_outside(): - """End-to-end: a guard-blocked exec is a soft tool error, not a turn-fatal. +async def test_runner_aborts_on_ssrf_violation(): + """SSRF still fatal-aborts the turn even though workspace ones are soft.""" + from nanobot.agent.runner import AgentRunSpec, AgentRunner - Reporter scenario: a previous PR turned ``path outside working dir`` into - a turn-fatal RuntimeError, so when the false-positive guard fired the - user got no further iterations and (depending on channel) a silent hang. - After narrowing the marker list, the runner must hand the error back to - the LLM and let the next iteration succeed normally. + provider = MagicMock() + provider.chat_with_retry = AsyncMock(side_effect=[ + LLMResponse( + content="curl-ing metadata", + tool_calls=[ToolCallRequest( + id="call_ssrf", + name="exec", + arguments={"command": "curl http://169.254.169.254"}, + )], + ), + LLMResponse(content="should NOT be reached", tool_calls=[]), + ]) + tools = MagicMock() + tools.get_definitions.return_value = [] + tools.execute = AsyncMock(return_value=( + "Error: Command blocked by safety guard (internal/private URL detected)" + )) + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[], + tools=tools, + model="test-model", + max_iterations=3, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + )) + + assert provider.chat_with_retry.await_count == 1, "SSRF must abort immediately" + assert result.stop_reason == "tool_error" + assert "internal/private url detected" in (result.error or "").lower() + + +@pytest.mark.asyncio +async def test_runner_lets_llm_recover_from_shell_guard_path_outside(): + """Reporter scenario for #3599 / #3605 -- guard hit, agent recovers. + + The shell `_guard_command` heuristic fires on `2>/dev/null`-style + redirects and other shell idioms. Before v2 that abort'd the whole + turn (silent hang on Telegram per #3605); now the LLM gets the soft + error back and can finalize on the next iteration. """ from nanobot.agent.runner import AgentRunSpec, AgentRunner @@ -443,7 +476,66 @@ async def test_runner_lets_llm_recover_from_shell_guard_path_outside(): assert result.error is None assert result.final_content == "recovered final answer" assert result.tool_events and result.tool_events[0]["status"] == "error" - assert "workspace_violation" not in result.tool_events[0]["detail"] + # v2: detail keeps the breadcrumb but the runner did not raise. + assert "workspace_violation" in result.tool_events[0]["detail"] + + +@pytest.mark.asyncio +async def test_runner_throttles_repeated_workspace_bypass_attempts(): + """#3493 motivation: stop the LLM bypass loop without aborting the turn. + + LLM keeps switching tools (read_file -> exec cat -> python -c open(...)) + against the same outside path. After the soft retry budget is exhausted + the runner replaces the tool result with a hard "stop trying" message + so the model finally gives up and surfaces the boundary to the user. + """ + from nanobot.agent.runner import AgentRunSpec, AgentRunner + + bypass_attempts = [ + ToolCallRequest( + id=f"a{i}", name="exec", + arguments={"command": f"cat /Users/x/Downloads/01.md # try {i}"}, + ) + for i in range(4) + ] + responses: list[LLMResponse] = [ + LLMResponse(content=f"try {i}", tool_calls=[bypass_attempts[i]]) + for i in range(4) + ] + responses.append(LLMResponse(content="ok telling user", tool_calls=[])) + + provider = MagicMock() + provider.chat_with_retry = AsyncMock(side_effect=responses) + tools = MagicMock() + tools.get_definitions.return_value = [] + tools.execute = AsyncMock( + return_value="Error: Command blocked by safety guard (path outside working dir)" + ) + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[], + tools=tools, + model="test-model", + max_iterations=10, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + )) + + # All 4 bypass attempts surface to the LLM (no fatal abort), and the + # runner finally completes once the LLM stops asking. + assert result.stop_reason != "tool_error" + assert result.error is None + assert result.final_content == "ok telling user" + # The third+ attempts must have been escalated -- look at the events. + escalated = [ + ev for ev in result.tool_events + if ev["status"] == "error" + and ev["detail"].startswith("workspace_violation_escalated:") + ] + assert escalated, ( + "expected at least one escalated workspace_violation event, got: " + f"{result.tool_events}" + ) @pytest.mark.asyncio @@ -924,6 +1016,7 @@ async def test_runner_batches_read_only_tools_before_exclusive_work(): ToolCallRequest(id="rw1", name="write_a", arguments={}), ], {}, + {}, ) assert shared_events[0:2] == ["start:read_a", "start:read_b"] @@ -968,6 +1061,7 @@ async def test_runner_does_not_batch_exclusive_read_only_tools(): ToolCallRequest(id="ro2", name="read_b", arguments={}), ], {}, + {}, ) assert shared_events[0] == "start:read_a" diff --git a/tests/tools/test_tool_validation.py b/tests/tools/test_tool_validation.py index 73e3b4f2a..92a3f8f50 100644 --- a/tests/tools/test_tool_validation.py +++ b/tests/tools/test_tool_validation.py @@ -242,13 +242,21 @@ def test_exec_extract_absolute_paths_captures_quoted_paths() -> None: def test_exec_guard_blocks_home_path_outside_workspace(tmp_path) -> None: tool = ExecTool(restrict_to_workspace=True) error = tool._guard_command("cat ~/.nanobot/config.json", str(tmp_path)) - assert error == "Error: Command blocked by safety guard (path outside working dir)" + assert error is not None + assert error.startswith( + "Error: Command blocked by safety guard (path outside working dir)" + ) + assert "hard policy boundary" in error def test_exec_guard_blocks_quoted_home_path_outside_workspace(tmp_path) -> None: tool = ExecTool(restrict_to_workspace=True) error = tool._guard_command('cat "~/.nanobot/config.json"', str(tmp_path)) - assert error == "Error: Command blocked by safety guard (path outside working dir)" + assert error is not None + assert error.startswith( + "Error: Command blocked by safety guard (path outside working dir)" + ) + assert "hard policy boundary" in error def test_exec_guard_allows_media_path_outside_workspace(tmp_path, monkeypatch) -> None: @@ -300,7 +308,11 @@ def test_exec_guard_blocks_windows_drive_root_outside_workspace(monkeypatch) -> tool = ExecTool(restrict_to_workspace=True) error = tool._guard_command("dir E:\\", "E:\\workspace") - assert error == "Error: Command blocked by safety guard (path outside working dir)" + assert error is not None + assert error.startswith( + "Error: Command blocked by safety guard (path outside working dir)" + ) + assert "hard policy boundary" in error # --- cast_params tests --- diff --git a/tests/utils/test_workspace_violation_throttle.py b/tests/utils/test_workspace_violation_throttle.py new file mode 100644 index 000000000..a0fb059e1 --- /dev/null +++ b/tests/utils/test_workspace_violation_throttle.py @@ -0,0 +1,120 @@ +"""Tests for repeated_workspace_violation throttle and signature.""" + +from __future__ import annotations + +from nanobot.utils.runtime import ( + repeated_workspace_violation_error, + workspace_violation_signature, +) + + +def test_signature_for_filesystem_tools_uses_path_argument(): + sig_a = workspace_violation_signature( + "read_file", {"path": "/Users/x/Downloads/01.md"} + ) + sig_b = workspace_violation_signature( + "write_file", {"path": "/Users/x/Downloads/01.md"} + ) + sig_c = workspace_violation_signature( + "edit_file", {"file_path": "/Users/x/Downloads/01.md"} + ) + + assert sig_a is not None + assert sig_a == sig_b == sig_c, ( + "the throttle must collapse equivalent paths across different tools " + "so the LLM cannot bypass it by switching tool" + ) + assert "/users/x/downloads/01.md" in sig_a + + +def test_signature_for_exec_extracts_first_absolute_path_in_command(): + sig = workspace_violation_signature( + "exec", + {"command": "cat /Users/x/Downloads/01.md && echo done"}, + ) + assert sig is not None + assert "/users/x/downloads/01.md" in sig + + +def test_signature_collides_across_filesystem_and_exec_for_same_target(): + """LLM bypass loops jump tools (read_file -> exec cat). Throttle must + treat both attempts as targeting the same outside resource.""" + fs_sig = workspace_violation_signature( + "read_file", {"path": "/Users/x/Downloads/01.md"} + ) + exec_sig = workspace_violation_signature( + "exec", {"command": "cat /Users/x/Downloads/01.md"} + ) + assert fs_sig == exec_sig + + +def test_signature_falls_back_to_working_dir_when_no_absolute_in_command(): + sig = workspace_violation_signature( + "exec", + {"command": "ls -la", "working_dir": "/etc"}, + ) + assert sig is not None + assert "/etc" in sig + + +def test_signature_is_none_for_unknown_tool_with_no_path(): + assert workspace_violation_signature("web_search", {"query": "anything"}) is None + assert workspace_violation_signature("exec", {"command": "echo hello"}) is None + + +def test_repeated_workspace_violation_returns_none_within_budget(): + counts: dict[str, int] = {} + arguments = {"path": "/Users/x/Downloads/01.md"} + + assert repeated_workspace_violation_error("read_file", arguments, counts) is None + assert repeated_workspace_violation_error("read_file", arguments, counts) is None + + +def test_repeated_workspace_violation_escalates_after_third_attempt(): + counts: dict[str, int] = {} + arguments = {"path": "/Users/x/Downloads/01.md"} + + repeated_workspace_violation_error("read_file", arguments, counts) + repeated_workspace_violation_error("read_file", arguments, counts) + third = repeated_workspace_violation_error("read_file", arguments, counts) + + assert third is not None + assert "refusing repeated workspace-bypass" in third + assert "/users/x/downloads/01.md" in third + assert "ask how they want to proceed" in third + + +def test_repeated_workspace_violation_independent_per_target(): + """Different outside paths must each get their own retry budget.""" + counts: dict[str, int] = {} + + repeated_workspace_violation_error( + "read_file", {"path": "/Users/x/Downloads/01.md"}, counts, + ) + repeated_workspace_violation_error( + "read_file", {"path": "/Users/x/Downloads/01.md"}, counts, + ) + # Different target, fresh budget. + assert repeated_workspace_violation_error( + "read_file", {"path": "/Users/x/Documents/notes.md"}, counts, + ) is None + + +def test_repeated_workspace_violation_collapses_tool_switching(): + """LLM switches from read_file to exec cat then to python -c open(...) + against the same path; the throttle must escalate on the third attempt.""" + counts: dict[str, int] = {} + + repeated_workspace_violation_error( + "read_file", {"path": "/Users/x/Downloads/01.md"}, counts, + ) + repeated_workspace_violation_error( + "exec", {"command": "cat /Users/x/Downloads/01.md"}, counts, + ) + third = repeated_workspace_violation_error( + "exec", + {"command": "python3 -c \"open('/Users/x/Downloads/01.md').read()\""}, + counts, + ) + assert third is not None + assert "refusing repeated workspace-bypass" in third