From 2a7433b7ecf92963091ab80fbfb83c0da5dc4be0 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 3 May 2026 17:16:42 +0000 Subject: [PATCH] chore(runner): tighten workspace guard comments and Windows tests Keep the workspace-boundary changes easier to review by trimming long explanatory comments down to short local notes. Also make the #3599 POSIX command regression skip on Windows and normalize workspace violation signatures to POSIX separators so the throttle tests are platform-stable. Tests: - uv run pytest tests/tools/test_exec_security.py tests/utils/test_workspace_violation_throttle.py -q - uv run pytest -q Co-authored-by: Cursor --- nanobot/agent/runner.py | 62 ++++++++----------------------- nanobot/agent/tools/shell.py | 26 ++----------- nanobot/utils/runtime.py | 54 +++------------------------ tests/tools/test_exec_security.py | 2 + 4 files changed, 28 insertions(+), 116 deletions(-) diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index 7a34cfbb7..c2e15bf8a 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -240,9 +240,7 @@ class AgentRunner: stop_reason = "completed" tool_events: list[dict[str, str]] = [] external_lookup_counts: dict[str, int] = {} - # Tracks repeated bypass attempts against the same outside-workspace - # target within this turn. See ``repeated_workspace_violation_error`` - # in ``nanobot.utils.runtime`` for the throttle policy. + # Per-turn throttle for repeated attempts against the same outside target. workspace_violation_counts: dict[str, int] = {} empty_content_retries = 0 length_recovery_count = 0 @@ -808,10 +806,7 @@ class AgentRunner: payload = f"Error: {type(exc).__name__}: {exc}" handled = self._classify_violation( raw_text=str(exc), - # Match the legacy behavior here: the exception branch never - # appended the "try a different approach" hint, even on the - # workspace-violation path -- preserve that for callers that - # eyeball the exact tool message. + # Preserve legacy exception payloads without the retry hint. soft_payload=payload, ssrf_payload=payload, ssrf_error=exc, @@ -854,17 +849,10 @@ class AgentRunner: detail = detail[:120] + "..." return result, {"name": tool_call.name, "status": "ok", "detail": detail}, None - # SSRF rejections remain a hard, non-recoverable safety boundary: a single - # successful internal-URL fetch can leak cloud metadata, so we never let - # the LLM "retry" with a different phrasing of the same target. + # SSRF remains fatal; workspace path boundaries are soft + throttled. _SSRF_MARKER: str = "internal/private url detected" - # Markers that identify "tried to access something outside the workspace". - # Unlike SSRF these are intentionally *non-fatal* (#3599 / #3605): - # - The structured error message itself tells the model not to bypass. - # - ``repeated_workspace_violation_error`` throttles the loop reported - # in #3493 by escalating after two attempts against the same target. - # - max_iterations is the ultimate ceiling, so we never need to abort. + # Non-SSRF boundary markers returned to the LLM as recoverable tool errors. _WORKSPACE_VIOLATION_MARKERS: tuple[str, ...] = ( "outside the configured workspace", "outside allowed directory", @@ -880,12 +868,7 @@ class AgentRunner: @classmethod def _is_workspace_violation(cls, text: str) -> bool: - """True when *text* looks like *any* policy boundary rejection. - - Kept as a public-ish hook for callers that need a yes/no signal - (logging, telemetry). The runner itself uses the more specific - ``_is_ssrf_violation`` to decide what is fatal. - """ + """True when *text* looks like any policy boundary rejection.""" if not text: return False lowered = text.lower() @@ -904,32 +887,14 @@ class AgentRunner: tool_call: ToolCallRequest, workspace_violation_counts: dict[str, int], ) -> tuple[Any, dict[str, str], BaseException | None] | None: - """Apply violation policy to a tool failure, or pass through. - - Returns a fully-formed (payload, event, error) triple when *raw_text* - looks like a policy boundary rejection. Returns ``None`` when the - caller should fall through to its generic per-branch handling. - - - SSRF stays fatal -- a single successful internal fetch can leak - cloud metadata, so retrying with a different URL phrasing is - never acceptable. We mutate ``event`` in place so the caller's - telemetry stays consistent. - - All other workspace-bound rejections become soft tool errors. - Each repeated attempt against the same outside target bumps a - per-turn counter; after the soft retry budget is exhausted we - replace the message body with an explicit "stop trying to bypass - the policy" instruction (see #3493 for the original bypass-loop - that motivated PR #3493's hard-abort, and #3599 / #3605 for why - the hard-abort backfired). - """ + """Classify safety-boundary failures, or return ``None`` to pass through.""" if self._is_ssrf_violation(raw_text): logger.warning( "Tool {} blocked by SSRF guard; aborting turn: {}", tool_call.name, raw_text.replace("\n", " ").strip()[:200], ) - event["detail"] = ("workspace_violation: " - + raw_text.replace("\n", " ").strip())[:160] + event["detail"] = self._event_detail("workspace_violation: ", raw_text) return ssrf_payload, event, ssrf_error if self._is_workspace_violation(raw_text): @@ -938,20 +903,25 @@ class AgentRunner: tool_call.arguments, workspace_violation_counts, ) - event["detail"] = ("workspace_violation: " - + raw_text.replace("\n", " ").strip())[:160] + event["detail"] = self._event_detail("workspace_violation: ", raw_text) if escalation is not None: logger.warning( "Tool {} hit workspace boundary repeatedly; escalating hint", tool_call.name, ) - event["detail"] = ("workspace_violation_escalated: " - + raw_text.replace("\n", " ").strip())[:160] + event["detail"] = self._event_detail( + "workspace_violation_escalated: ", + raw_text, + ) return escalation, event, None return soft_payload, event, None return None + @staticmethod + def _event_detail(prefix: str, text: str, limit: int = 160) -> str: + return (prefix + text.replace("\n", " ").strip())[:limit] + async def _emit_checkpoint( self, spec: AgentRunSpec, diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index 2ed6b981e..a05293aae 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -19,11 +19,7 @@ from nanobot.config.paths import get_media_dir _IS_WINDOWS = sys.platform == "win32" -# Appended to every workspace / safety guard rejection so the LLM is told -# explicitly that this is a policy boundary (not a transient failure) and -# that bypass loops will not change the answer. The throttle in -# ``nanobot.utils.runtime.repeated_workspace_violation_error`` upgrades -# this further once the model keeps targeting the same path. +# Policy note appended to recoverable workspace-boundary guard errors. _WORKSPACE_BOUNDARY_NOTE = ( "\n\nNote: this is a hard policy boundary, not a transient failure. " "Do NOT retry with shell tricks (symlinks, base64 piping, alternative " @@ -97,10 +93,7 @@ class ExecTool(Tool): _MAX_TIMEOUT = 600 _MAX_OUTPUT = 10_000 - # Kernel device files that are universally safe as stdio redirect targets - # (e.g. ``cmd 2>/dev/null``). Without this allow-list the workspace guard - # treats them as ``path outside working dir`` and the LLM ends up unable - # to silence stderr inside the workspace (#3599). + # Kernel device files safe as stdio redirect targets (#3599). _BENIGN_DEVICE_PATHS: frozenset[str] = frozenset({ "/dev/null", "/dev/zero", @@ -325,10 +318,7 @@ class ExecTool(Tool): from nanobot.security.network import contains_internal_url if contains_internal_url(cmd): - # SSRF: stay short and direct. The runner classifies this - # marker as a hard, non-recoverable boundary, so the - # _WORKSPACE_BOUNDARY_NOTE policy text doesn't apply here -- - # we don't want the model to retry at all. + # SSRF stays fatal in the runner, so keep this marker direct. return "Error: Command blocked by safety guard (internal/private URL detected)" if self.restrict_to_workspace: @@ -371,15 +361,7 @@ class ExecTool(Tool): @classmethod def _is_benign_device_path(cls, path: str) -> bool: - """Return True when *path* is a kernel device file we should never block. - - Treats ``/dev/null``, the standard streams, ``/dev/random``, etc. as - always-safe targets so that idiomatic stdio plumbing such as - ``cmd 2>/dev/null`` or ``echo done >/dev/stderr`` is not flagged as a - workspace violation regardless of the configured working directory. - Also accepts ``/dev/fd/N`` because those are per-process aliases for - already-open file descriptors and never escape the workspace. - """ + """Return True for kernel device files that should never be workspace-blocked.""" if path in cls._BENIGN_DEVICE_PATHS: return True return path.startswith("/dev/fd/") diff --git a/nanobot/utils/runtime.py b/nanobot/utils/runtime.py index f5c5f994a..4157b396f 100644 --- a/nanobot/utils/runtime.py +++ b/nanobot/utils/runtime.py @@ -12,12 +12,7 @@ from nanobot.utils.helpers import stringify_text_blocks _MAX_REPEAT_EXTERNAL_LOOKUPS = 2 -# Workspace-violation throttle: how many times the LLM is allowed to bump -# against the same outside-workspace target *within a single turn* before the -# tool result is escalated with a hard "stop trying to bypass the policy" -# instruction. Two free attempts give the model room to e.g. read_file then -# fall back to exec without immediately escalating; the third attempt at the -# same target is treated as a clear bypass loop. +# Third same-target workspace violation in a turn escalates to "stop retrying". _MAX_REPEAT_WORKSPACE_VIOLATIONS = 2 EMPTY_FINAL_RESPONSE_MESSAGE = ( @@ -107,29 +102,7 @@ def repeated_external_lookup_error( ) -# --- Workspace-violation throttle -------------------------------------------- -# -# When ``restrict_to_workspace`` is on and the LLM tries to read or exec -# something outside of the workspace, we want to *tell* the model that it -# hit a hard policy boundary -- not silently abort the whole turn and not -# allow it to spin forever swapping ``read_file`` for ``exec cat`` for -# ``python -c open(...)`` (the actual loop reported in #3493). The strategy -# is two-fold: -# -# 1. Each individual guard error already includes structured instructions -# that tell the model "don't try to bypass this; ask the user for help". -# 2. We additionally count how many times the *same outside target* has -# been refused within the current turn. After two free attempts the -# third refusal swaps in a much more forceful message that quotes the -# target path and explicitly orders the model to stop and surface the -# boundary back to the user. The model is still free to do something -# else (different target, different question) -- only the bypass loop -# is interrupted. -# -# This intentionally does *not* fatal-abort the turn: max_iterations and -# the empty-final-response retries already provide the ultimate ceiling -# for runaway loops, and aborting is what produced the silent-hang bug -# in #3605 in the first place. +# Workspace-boundary violations are soft errors, with per-target throttling. _OUTSIDE_PATH_PATTERN = re.compile(r"(?:^|[\s|>'\"])((?:/[^\s\"'>;|<]+)|(?:~[^\s\"'>;|<]+))") @@ -138,14 +111,7 @@ def workspace_violation_signature( tool_name: str, arguments: dict[str, Any], ) -> str | None: - """Return a stable signature for the outside-workspace target a tool tried. - - The signature is shared across tool names so that the LLM cannot bypass - the throttle by switching from ``read_file`` to ``exec cat`` to - ``python -c open(...)`` against the same path. Returns ``None`` when - the call has no obvious outside target (e.g. SSRF rejections, deny - pattern hits, or tools whose argument shape we don't understand). - """ + """Return a stable cross-tool signature for the outside-workspace target.""" for key in ("path", "file_path", "target", "source", "destination"): val = arguments.get(key) if isinstance(val, str) and val.strip(): @@ -167,9 +133,9 @@ def workspace_violation_signature( def _normalize_violation_target(raw: str) -> str: """Normalize *raw* path so that equivalent spellings collide on the same key.""" try: - normalized = str(Path(raw).expanduser().resolve()) + normalized = Path(raw).expanduser().resolve().as_posix() except Exception: - normalized = raw + normalized = raw.replace("\\", "/") return f"violation:{normalized}".lower() @@ -178,15 +144,7 @@ def repeated_workspace_violation_error( arguments: dict[str, Any], seen_counts: dict[str, int], ) -> str | None: - """Return an escalated error string after repeated bypass attempts. - - Returns ``None`` while the LLM is still within the soft retry budget -- - callers should fall back to the tool's own error message in that case. - Once the budget is exceeded, returns a hard "stop trying" instruction - that quotes the offending target. Throttle state lives in - *seen_counts* (a per-turn dict), so the budget naturally resets across - turns without persisting LLM-controlled keys. - """ + """Return an escalated error after repeated bypass attempts.""" signature = workspace_violation_signature(tool_name, arguments) if signature is None: return None diff --git a/tests/tools/test_exec_security.py b/tests/tools/test_exec_security.py index b7ccf6a2b..844d535c0 100644 --- a/tests/tools/test_exec_security.py +++ b/tests/tools/test_exec_security.py @@ -3,6 +3,7 @@ from __future__ import annotations import socket +import sys from unittest.mock import patch import pytest @@ -212,6 +213,7 @@ def test_exec_allows_benign_device_targets_inside_workspace(tmp_path, command): @pytest.mark.asyncio +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX rm and /dev/null syntax") async def test_exec_3599_regression_rm_with_dev_null_redirect(tmp_path): """#3599: ``rm 2>/dev/null`` must succeed against the workspace guard.""" workspace = tmp_path / "workspace"