chore(runner): tighten workspace guard comments and Windows tests

Keep the workspace-boundary changes easier to review by trimming long explanatory comments down to short local notes. Also make the #3599 POSIX command regression skip on Windows and normalize workspace violation signatures to POSIX separators so the throttle tests are platform-stable.

Tests:
- uv run pytest tests/tools/test_exec_security.py tests/utils/test_workspace_violation_throttle.py -q
- uv run pytest -q

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Xubin Ren 2026-05-03 17:16:42 +00:00 committed by Xubin Ren
parent b8406be215
commit 2a7433b7ec
4 changed files with 28 additions and 116 deletions

View File

@ -240,9 +240,7 @@ class AgentRunner:
stop_reason = "completed"
tool_events: list[dict[str, str]] = []
external_lookup_counts: dict[str, int] = {}
# Tracks repeated bypass attempts against the same outside-workspace
# target within this turn. See ``repeated_workspace_violation_error``
# in ``nanobot.utils.runtime`` for the throttle policy.
# Per-turn throttle for repeated attempts against the same outside target.
workspace_violation_counts: dict[str, int] = {}
empty_content_retries = 0
length_recovery_count = 0
@ -808,10 +806,7 @@ class AgentRunner:
payload = f"Error: {type(exc).__name__}: {exc}"
handled = self._classify_violation(
raw_text=str(exc),
# Match the legacy behavior here: the exception branch never
# appended the "try a different approach" hint, even on the
# workspace-violation path -- preserve that for callers that
# eyeball the exact tool message.
# Preserve legacy exception payloads without the retry hint.
soft_payload=payload,
ssrf_payload=payload,
ssrf_error=exc,
@ -854,17 +849,10 @@ class AgentRunner:
detail = detail[:120] + "..."
return result, {"name": tool_call.name, "status": "ok", "detail": detail}, None
# SSRF rejections remain a hard, non-recoverable safety boundary: a single
# successful internal-URL fetch can leak cloud metadata, so we never let
# the LLM "retry" with a different phrasing of the same target.
# SSRF remains fatal; workspace path boundaries are soft + throttled.
_SSRF_MARKER: str = "internal/private url detected"
# Markers that identify "tried to access something outside the workspace".
# Unlike SSRF these are intentionally *non-fatal* (#3599 / #3605):
# - The structured error message itself tells the model not to bypass.
# - ``repeated_workspace_violation_error`` throttles the loop reported
# in #3493 by escalating after two attempts against the same target.
# - max_iterations is the ultimate ceiling, so we never need to abort.
# Non-SSRF boundary markers returned to the LLM as recoverable tool errors.
_WORKSPACE_VIOLATION_MARKERS: tuple[str, ...] = (
"outside the configured workspace",
"outside allowed directory",
@ -880,12 +868,7 @@ class AgentRunner:
@classmethod
def _is_workspace_violation(cls, text: str) -> bool:
"""True when *text* looks like *any* policy boundary rejection.
Kept as a public-ish hook for callers that need a yes/no signal
(logging, telemetry). The runner itself uses the more specific
``_is_ssrf_violation`` to decide what is fatal.
"""
"""True when *text* looks like any policy boundary rejection."""
if not text:
return False
lowered = text.lower()
@ -904,32 +887,14 @@ class AgentRunner:
tool_call: ToolCallRequest,
workspace_violation_counts: dict[str, int],
) -> tuple[Any, dict[str, str], BaseException | None] | None:
"""Apply violation policy to a tool failure, or pass through.
Returns a fully-formed (payload, event, error) triple when *raw_text*
looks like a policy boundary rejection. Returns ``None`` when the
caller should fall through to its generic per-branch handling.
- SSRF stays fatal -- a single successful internal fetch can leak
cloud metadata, so retrying with a different URL phrasing is
never acceptable. We mutate ``event`` in place so the caller's
telemetry stays consistent.
- All other workspace-bound rejections become soft tool errors.
Each repeated attempt against the same outside target bumps a
per-turn counter; after the soft retry budget is exhausted we
replace the message body with an explicit "stop trying to bypass
the policy" instruction (see #3493 for the original bypass-loop
that motivated PR #3493's hard-abort, and #3599 / #3605 for why
the hard-abort backfired).
"""
"""Classify safety-boundary failures, or return ``None`` to pass through."""
if self._is_ssrf_violation(raw_text):
logger.warning(
"Tool {} blocked by SSRF guard; aborting turn: {}",
tool_call.name,
raw_text.replace("\n", " ").strip()[:200],
)
event["detail"] = ("workspace_violation: "
+ raw_text.replace("\n", " ").strip())[:160]
event["detail"] = self._event_detail("workspace_violation: ", raw_text)
return ssrf_payload, event, ssrf_error
if self._is_workspace_violation(raw_text):
@ -938,20 +903,25 @@ class AgentRunner:
tool_call.arguments,
workspace_violation_counts,
)
event["detail"] = ("workspace_violation: "
+ raw_text.replace("\n", " ").strip())[:160]
event["detail"] = self._event_detail("workspace_violation: ", raw_text)
if escalation is not None:
logger.warning(
"Tool {} hit workspace boundary repeatedly; escalating hint",
tool_call.name,
)
event["detail"] = ("workspace_violation_escalated: "
+ raw_text.replace("\n", " ").strip())[:160]
event["detail"] = self._event_detail(
"workspace_violation_escalated: ",
raw_text,
)
return escalation, event, None
return soft_payload, event, None
return None
@staticmethod
def _event_detail(prefix: str, text: str, limit: int = 160) -> str:
return (prefix + text.replace("\n", " ").strip())[:limit]
async def _emit_checkpoint(
self,
spec: AgentRunSpec,

View File

@ -19,11 +19,7 @@ from nanobot.config.paths import get_media_dir
_IS_WINDOWS = sys.platform == "win32"
# Appended to every workspace / safety guard rejection so the LLM is told
# explicitly that this is a policy boundary (not a transient failure) and
# that bypass loops will not change the answer. The throttle in
# ``nanobot.utils.runtime.repeated_workspace_violation_error`` upgrades
# this further once the model keeps targeting the same path.
# Policy note appended to recoverable workspace-boundary guard errors.
_WORKSPACE_BOUNDARY_NOTE = (
"\n\nNote: this is a hard policy boundary, not a transient failure. "
"Do NOT retry with shell tricks (symlinks, base64 piping, alternative "
@ -97,10 +93,7 @@ class ExecTool(Tool):
_MAX_TIMEOUT = 600
_MAX_OUTPUT = 10_000
# Kernel device files that are universally safe as stdio redirect targets
# (e.g. ``cmd 2>/dev/null``). Without this allow-list the workspace guard
# treats them as ``path outside working dir`` and the LLM ends up unable
# to silence stderr inside the workspace (#3599).
# Kernel device files safe as stdio redirect targets (#3599).
_BENIGN_DEVICE_PATHS: frozenset[str] = frozenset({
"/dev/null",
"/dev/zero",
@ -325,10 +318,7 @@ class ExecTool(Tool):
from nanobot.security.network import contains_internal_url
if contains_internal_url(cmd):
# SSRF: stay short and direct. The runner classifies this
# marker as a hard, non-recoverable boundary, so the
# _WORKSPACE_BOUNDARY_NOTE policy text doesn't apply here --
# we don't want the model to retry at all.
# SSRF stays fatal in the runner, so keep this marker direct.
return "Error: Command blocked by safety guard (internal/private URL detected)"
if self.restrict_to_workspace:
@ -371,15 +361,7 @@ class ExecTool(Tool):
@classmethod
def _is_benign_device_path(cls, path: str) -> bool:
"""Return True when *path* is a kernel device file we should never block.
Treats ``/dev/null``, the standard streams, ``/dev/random``, etc. as
always-safe targets so that idiomatic stdio plumbing such as
``cmd 2>/dev/null`` or ``echo done >/dev/stderr`` is not flagged as a
workspace violation regardless of the configured working directory.
Also accepts ``/dev/fd/N`` because those are per-process aliases for
already-open file descriptors and never escape the workspace.
"""
"""Return True for kernel device files that should never be workspace-blocked."""
if path in cls._BENIGN_DEVICE_PATHS:
return True
return path.startswith("/dev/fd/")

View File

@ -12,12 +12,7 @@ from nanobot.utils.helpers import stringify_text_blocks
_MAX_REPEAT_EXTERNAL_LOOKUPS = 2
# Workspace-violation throttle: how many times the LLM is allowed to bump
# against the same outside-workspace target *within a single turn* before the
# tool result is escalated with a hard "stop trying to bypass the policy"
# instruction. Two free attempts give the model room to e.g. read_file then
# fall back to exec without immediately escalating; the third attempt at the
# same target is treated as a clear bypass loop.
# Third same-target workspace violation in a turn escalates to "stop retrying".
_MAX_REPEAT_WORKSPACE_VIOLATIONS = 2
EMPTY_FINAL_RESPONSE_MESSAGE = (
@ -107,29 +102,7 @@ def repeated_external_lookup_error(
)
# --- Workspace-violation throttle --------------------------------------------
#
# When ``restrict_to_workspace`` is on and the LLM tries to read or exec
# something outside of the workspace, we want to *tell* the model that it
# hit a hard policy boundary -- not silently abort the whole turn and not
# allow it to spin forever swapping ``read_file`` for ``exec cat`` for
# ``python -c open(...)`` (the actual loop reported in #3493). The strategy
# is two-fold:
#
# 1. Each individual guard error already includes structured instructions
# that tell the model "don't try to bypass this; ask the user for help".
# 2. We additionally count how many times the *same outside target* has
# been refused within the current turn. After two free attempts the
# third refusal swaps in a much more forceful message that quotes the
# target path and explicitly orders the model to stop and surface the
# boundary back to the user. The model is still free to do something
# else (different target, different question) -- only the bypass loop
# is interrupted.
#
# This intentionally does *not* fatal-abort the turn: max_iterations and
# the empty-final-response retries already provide the ultimate ceiling
# for runaway loops, and aborting is what produced the silent-hang bug
# in #3605 in the first place.
# Workspace-boundary violations are soft errors, with per-target throttling.
_OUTSIDE_PATH_PATTERN = re.compile(r"(?:^|[\s|>'\"])((?:/[^\s\"'>;|<]+)|(?:~[^\s\"'>;|<]+))")
@ -138,14 +111,7 @@ def workspace_violation_signature(
tool_name: str,
arguments: dict[str, Any],
) -> str | None:
"""Return a stable signature for the outside-workspace target a tool tried.
The signature is shared across tool names so that the LLM cannot bypass
the throttle by switching from ``read_file`` to ``exec cat`` to
``python -c open(...)`` against the same path. Returns ``None`` when
the call has no obvious outside target (e.g. SSRF rejections, deny
pattern hits, or tools whose argument shape we don't understand).
"""
"""Return a stable cross-tool signature for the outside-workspace target."""
for key in ("path", "file_path", "target", "source", "destination"):
val = arguments.get(key)
if isinstance(val, str) and val.strip():
@ -167,9 +133,9 @@ def workspace_violation_signature(
def _normalize_violation_target(raw: str) -> str:
"""Normalize *raw* path so that equivalent spellings collide on the same key."""
try:
normalized = str(Path(raw).expanduser().resolve())
normalized = Path(raw).expanduser().resolve().as_posix()
except Exception:
normalized = raw
normalized = raw.replace("\\", "/")
return f"violation:{normalized}".lower()
@ -178,15 +144,7 @@ def repeated_workspace_violation_error(
arguments: dict[str, Any],
seen_counts: dict[str, int],
) -> str | None:
"""Return an escalated error string after repeated bypass attempts.
Returns ``None`` while the LLM is still within the soft retry budget --
callers should fall back to the tool's own error message in that case.
Once the budget is exceeded, returns a hard "stop trying" instruction
that quotes the offending target. Throttle state lives in
*seen_counts* (a per-turn dict), so the budget naturally resets across
turns without persisting LLM-controlled keys.
"""
"""Return an escalated error after repeated bypass attempts."""
signature = workspace_violation_signature(tool_name, arguments)
if signature is None:
return None

View File

@ -3,6 +3,7 @@
from __future__ import annotations
import socket
import sys
from unittest.mock import patch
import pytest
@ -212,6 +213,7 @@ def test_exec_allows_benign_device_targets_inside_workspace(tmp_path, command):
@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX rm and /dev/null syntax")
async def test_exec_3599_regression_rm_with_dev_null_redirect(tmp_path):
"""#3599: ``rm <ws-path> 2>/dev/null`` must succeed against the workspace guard."""
workspace = tmp_path / "workspace"