fix(runner): soft workspace boundary + per-target throttle (#3493 #3599 #3605)

Replaces PR #3493's blanket fatal abort with a "tell the model + throttle
the bypass loop" policy.  Workspace-bound rejections are now ordinary
recoverable tool errors enriched with a structured "this is a hard policy
boundary" instruction; SSRF stays the only marker that aborts the turn.

Why the fatal-abort approach broke
----------------------------------
PR #3493 promoted every shell `_guard_command` and filesystem path-resolution
rejection to a turn-fatal RuntimeError.  Two of those messages (`path
outside working dir` and `path traversal detected`) are heuristic substring
scans on the raw command, so legitimate commands like `rm <ws>/x.txt
2>/dev/null` or `find . -type f` killed the user's turn (#3599).  On
channels with outbound dedupe (Telegram) the user just saw silence (#3605),
and the noise polluted the LLM's context until it started hallucinating
guard rejections on plain relative paths (#3597).

Why we still need *some* throttle
---------------------------------
The original #3493 pain point was real: the LLM, refused once, would
swap tools and try again -- read_file -> exec cat -> exec cp -> bash -c
-> ln -sf -> python -c open(...).  Just removing the fatal escape lets
that loop run wild until max_iterations.

What this commit does
---------------------
- `nanobot/utils/runtime.py`: add `workspace_violation_signature` and
  `repeated_workspace_violation_error`.  The signature normalizes
  filesystem `path` arguments and the first absolute path inside an
  exec command, so swapping tools against the same outside target hits
  the same throttle bucket.  Two soft attempts are allowed; the third
  attempt's tool result is replaced with a hard "stop trying to bypass"
  message that quotes the target path and tells the model to ask the
  user for help.

- `nanobot/agent/runner.py`: split classification into `_is_ssrf_violation`
  (still fatal) and `_is_workspace_violation` (now soft).  All three
  failure branches in `_run_tool` (prep_error / exception / Error
  result) route through a shared `_classify_violation` that bumps the
  per-turn workspace_violation_counts dict and either keeps the tool's
  own message or substitutes the throttle escalation.  `_execute_tools`
  now threads that dict alongside the existing external_lookup_counts.

- `nanobot/agent/tools/shell.py`: append a structured boundary note to
  every workspace-bound guard rejection (`working_dir could not be
  resolved`, `working_dir is outside`, `path outside working dir`,
  `path traversal detected`).  SSRF errors stay short and direct so the
  model doesn't try to "phrase around" them.  Existing `2>/dev/null`
  allow-list and benign device passthrough from the previous commit
  remain.

- `nanobot/agent/tools/filesystem.py`: append the same boundary note to
  the `outside allowed directory` PermissionError so read_file / write_file
  / list_dir errors give the LLM the same explicit hint.

Tests
-----
- `tests/utils/test_workspace_violation_throttle.py` (new): signature
  collapses across read_file/exec/python -c against the same path,
  different paths get independent budgets, escalation only fires after
  the third attempt.

- `tests/agent/test_runner.py`:
  - `test_runner_does_not_abort_on_workspace_violation_anymore` -- v2
    contract: filesystem PermissionError is now soft, runner moves to
    the next iteration and finalizes cleanly.
  - `test_is_ssrf_violation_remains_fatal` + the existing
    `test_runner_aborts_on_ssrf_violation` -- SSRF still aborts on the
    first attempt.
  - `test_runner_lets_llm_recover_from_shell_guard_path_outside` -- end
    to end recovery from `path outside working dir`.
  - `test_runner_throttles_repeated_workspace_bypass_attempts` -- four
    bypass attempts against the same outside target produce at least
    one `workspace_violation_escalated` event and the run completes
    naturally without aborting the turn.
  - The two `_execute_tools` direct-call tests now pass the new
    workspace_violation_counts dict.

- `tests/tools/test_tool_validation.py`: relax three `==` assertions
  to `startswith` + "hard policy boundary" substring check to match
  the new structured error messages.

- `tests/tools/test_exec_security.py` keeps the prior `2>/dev/null`
  regression and the `> /etc/issue` negative case from the previous
  commit on this branch -- they still pass under the new policy.

Coverage status: full pytest 2648 passed / 2 skipped (was 2638 / 2
on origin/main).  Ruff is clean for every file touched in this commit.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Xubin Ren 2026-05-03 17:04:08 +00:00 committed by Xubin Ren
parent 7742f8fbdc
commit b8406be215
7 changed files with 585 additions and 117 deletions

View File

@ -33,6 +33,7 @@ from nanobot.utils.runtime import (
ensure_nonempty_tool_result, ensure_nonempty_tool_result,
is_blank_text, is_blank_text,
repeated_external_lookup_error, repeated_external_lookup_error,
repeated_workspace_violation_error,
) )
_DEFAULT_ERROR_MESSAGE = "Sorry, I encountered an error calling the AI model." _DEFAULT_ERROR_MESSAGE = "Sorry, I encountered an error calling the AI model."
@ -239,6 +240,10 @@ class AgentRunner:
stop_reason = "completed" stop_reason = "completed"
tool_events: list[dict[str, str]] = [] tool_events: list[dict[str, str]] = []
external_lookup_counts: dict[str, int] = {} external_lookup_counts: dict[str, int] = {}
# Tracks repeated bypass attempts against the same outside-workspace
# target within this turn. See ``repeated_workspace_violation_error``
# in ``nanobot.utils.runtime`` for the throttle policy.
workspace_violation_counts: dict[str, int] = {}
empty_content_retries = 0 empty_content_retries = 0
length_recovery_count = 0 length_recovery_count = 0
had_injections = False had_injections = False
@ -314,6 +319,7 @@ class AgentRunner:
spec, spec,
tool_calls, tool_calls,
external_lookup_counts, external_lookup_counts,
workspace_violation_counts,
) )
tool_events.extend(new_events) tool_events.extend(new_events)
context.tool_results = list(results) context.tool_results = list(results)
@ -698,20 +704,25 @@ class AgentRunner:
spec: AgentRunSpec, spec: AgentRunSpec,
tool_calls: list[ToolCallRequest], tool_calls: list[ToolCallRequest],
external_lookup_counts: dict[str, int], external_lookup_counts: dict[str, int],
workspace_violation_counts: dict[str, int],
) -> tuple[list[Any], list[dict[str, str]], BaseException | None]: ) -> tuple[list[Any], list[dict[str, str]], BaseException | None]:
batches = self._partition_tool_batches(spec, tool_calls) batches = self._partition_tool_batches(spec, tool_calls)
tool_results: list[tuple[Any, dict[str, str], BaseException | None]] = [] tool_results: list[tuple[Any, dict[str, str], BaseException | None]] = []
for batch in batches: for batch in batches:
if spec.concurrent_tools and len(batch) > 1: if spec.concurrent_tools and len(batch) > 1:
batch_results = await asyncio.gather(*( batch_results = await asyncio.gather(*(
self._run_tool(spec, tool_call, external_lookup_counts) self._run_tool(
spec, tool_call, external_lookup_counts, workspace_violation_counts,
)
for tool_call in batch for tool_call in batch
)) ))
tool_results.extend(batch_results) tool_results.extend(batch_results)
else: else:
batch_results = [] batch_results = []
for tool_call in batch: for tool_call in batch:
result = await self._run_tool(spec, tool_call, external_lookup_counts) result = await self._run_tool(
spec, tool_call, external_lookup_counts, workspace_violation_counts,
)
tool_results.append(result) tool_results.append(result)
batch_results.append(result) batch_results.append(result)
if isinstance(result[2], AskUserInterrupt): if isinstance(result[2], AskUserInterrupt):
@ -734,6 +745,7 @@ class AgentRunner:
spec: AgentRunSpec, spec: AgentRunSpec,
tool_call: ToolCallRequest, tool_call: ToolCallRequest,
external_lookup_counts: dict[str, int], external_lookup_counts: dict[str, int],
workspace_violation_counts: dict[str, int],
) -> tuple[Any, dict[str, str], BaseException | None]: ) -> tuple[Any, dict[str, str], BaseException | None]:
hint = "\n\n[Analyze the error above and try a different approach.]" hint = "\n\n[Analyze the error above and try a different approach.]"
lookup_error = repeated_external_lookup_error( lookup_error = repeated_external_lookup_error(
@ -763,16 +775,20 @@ class AgentRunner:
"status": "error", "status": "error",
"detail": prep_error.split(": ", 1)[-1][:120], "detail": prep_error.split(": ", 1)[-1][:120],
} }
if self._is_workspace_violation(prep_error): handled = self._classify_violation(
logger.warning( raw_text=prep_error,
"Tool {} blocked by workspace/safety guard during preparation; aborting turn: {}", soft_payload=prep_error + hint,
tool_call.name, ssrf_payload=prep_error,
prep_error.replace("\n", " ").strip()[:200], ssrf_error=RuntimeError(prep_error),
) event=event,
event["detail"] = ("workspace_violation: " tool_call=tool_call,
+ prep_error.replace("\n", " ").strip())[:160] workspace_violation_counts=workspace_violation_counts,
return prep_error, event, RuntimeError(prep_error) )
return prep_error + hint, event, RuntimeError(prep_error) if spec.fail_on_tool_error else None if handled is not None:
return handled
return prep_error + hint, event, (
RuntimeError(prep_error) if spec.fail_on_tool_error else None
)
try: try:
if tool is not None: if tool is not None:
result = await tool.execute(**params) result = await tool.execute(**params)
@ -789,18 +805,25 @@ class AgentRunner:
if isinstance(exc, AskUserInterrupt): if isinstance(exc, AskUserInterrupt):
event["status"] = "waiting" event["status"] = "waiting"
return "", event, exc return "", event, exc
if self._is_workspace_violation(str(exc)): payload = f"Error: {type(exc).__name__}: {exc}"
logger.warning( handled = self._classify_violation(
"Tool {} blocked by workspace/safety guard; aborting turn: {}", raw_text=str(exc),
tool_call.name, # Match the legacy behavior here: the exception branch never
str(exc).replace("\n", " ").strip()[:200], # appended the "try a different approach" hint, even on the
) # workspace-violation path -- preserve that for callers that
event["detail"] = ("workspace_violation: " # eyeball the exact tool message.
+ str(exc).replace("\n", " ").strip())[:160] soft_payload=payload,
return f"Error: {type(exc).__name__}: {exc}", event, exc ssrf_payload=payload,
ssrf_error=exc,
event=event,
tool_call=tool_call,
workspace_violation_counts=workspace_violation_counts,
)
if handled is not None:
return handled
if spec.fail_on_tool_error: if spec.fail_on_tool_error:
return f"Error: {type(exc).__name__}: {exc}", event, exc return payload, event, exc
return f"Error: {type(exc).__name__}: {exc}", event, None return payload, event, None
if isinstance(result, str) and result.startswith("Error"): if isinstance(result, str) and result.startswith("Error"):
event = { event = {
@ -808,17 +831,17 @@ class AgentRunner:
"status": "error", "status": "error",
"detail": result.replace("\n", " ").strip()[:120], "detail": result.replace("\n", " ").strip()[:120],
} }
handled = self._classify_violation(
# check the outside workspace error and break loop raw_text=result,
if self._is_workspace_violation(result): soft_payload=result + hint,
logger.warning( ssrf_payload=result,
"Tool {} blocked by workspace/safety guard; aborting turn: {}", ssrf_error=RuntimeError(result),
tool_call.name, event=event,
result.replace("\n", " ").strip()[:200], tool_call=tool_call,
) workspace_violation_counts=workspace_violation_counts,
event["detail"] = ("workspace_violation: " )
+ result.replace("\n", " ").strip())[:160] if handled is not None:
return result, event, RuntimeError(result) return handled
if spec.fail_on_tool_error: if spec.fail_on_tool_error:
return result + hint, event, RuntimeError(result) return result + hint, event, RuntimeError(result)
return result + hint, event, None return result + hint, event, None
@ -831,39 +854,103 @@ class AgentRunner:
detail = detail[:120] + "..." detail = detail[:120] + "..."
return result, {"name": tool_call.name, "status": "ok", "detail": detail}, None return result, {"name": tool_call.name, "status": "ok", "detail": detail}, None
# Markers identifying tool results that represent a *hard* workspace / # SSRF rejections remain a hard, non-recoverable safety boundary: a single
# safety boundary rejection -- only these abort the agent loop. # successful internal-URL fetch can leak cloud metadata, so we never let
# # the LLM "retry" with a different phrasing of the same target.
# We deliberately keep this list narrow (#3599 / #3605): _SSRF_MARKER: str = "internal/private url detected"
# - The first four come from explicit path-resolution checks in
# ``filesystem.py`` and ``shell.py`` that cannot false-positive on user # Markers that identify "tried to access something outside the workspace".
# payloads -- if you see them, the LLM truly tried to escape the # Unlike SSRF these are intentionally *non-fatal* (#3599 / #3605):
# workspace. # - The structured error message itself tells the model not to bypass.
# - "internal/private url detected" stays fatal because SSRF is a real # - ``repeated_workspace_violation_error`` throttles the loop reported
# security boundary; allowing the LLM to "retry" would just let it # in #3493 by escalating after two attempts against the same target.
# poke internal infra with a different URL phrasing. # - max_iterations is the ultimate ceiling, so we never need to abort.
# - "path traversal detected" and "path outside working dir" are _WORKSPACE_VIOLATION_MARKERS: tuple[str, ...] = (
# intentionally *not* listed: both come from the heuristic
# ``_guard_command`` checks in ``shell.py`` which scan the raw command
# string and routinely false-positive on legitimate constructs (e.g.
# ``2>/dev/null`` redirects, quoted ``..`` arguments to ``sed`` /
# ``find``, paths inside inline scripts). Treating them as fatal
# silently kills user turns (#3599) and prevents the agent from
# self-correcting by trying a different approach (#3605).
_WORKSPACE_BLOCK_MARKERS: tuple[str, ...] = (
"outside the configured workspace", "outside the configured workspace",
"outside allowed directory", "outside allowed directory",
"working_dir is outside", "working_dir is outside",
"working_dir could not be resolved", "working_dir could not be resolved",
"internal/private url detected", "path outside working dir",
"path traversal detected",
) )
@classmethod
def _is_ssrf_violation(cls, text: str) -> bool:
return bool(text) and cls._SSRF_MARKER in text.lower()
@classmethod @classmethod
def _is_workspace_violation(cls, text: str) -> bool: def _is_workspace_violation(cls, text: str) -> bool:
"""True when *text* looks like *any* policy boundary rejection.
Kept as a public-ish hook for callers that need a yes/no signal
(logging, telemetry). The runner itself uses the more specific
``_is_ssrf_violation`` to decide what is fatal.
"""
if not text: if not text:
return False return False
lowered = text.lower() lowered = text.lower()
return any(marker in lowered for marker in cls._WORKSPACE_BLOCK_MARKERS) if cls._SSRF_MARKER in lowered:
return True
return any(marker in lowered for marker in cls._WORKSPACE_VIOLATION_MARKERS)
def _classify_violation(
self,
*,
raw_text: str,
soft_payload: str,
ssrf_payload: str,
ssrf_error: BaseException,
event: dict[str, str],
tool_call: ToolCallRequest,
workspace_violation_counts: dict[str, int],
) -> tuple[Any, dict[str, str], BaseException | None] | None:
"""Apply violation policy to a tool failure, or pass through.
Returns a fully-formed (payload, event, error) triple when *raw_text*
looks like a policy boundary rejection. Returns ``None`` when the
caller should fall through to its generic per-branch handling.
- SSRF stays fatal -- a single successful internal fetch can leak
cloud metadata, so retrying with a different URL phrasing is
never acceptable. We mutate ``event`` in place so the caller's
telemetry stays consistent.
- All other workspace-bound rejections become soft tool errors.
Each repeated attempt against the same outside target bumps a
per-turn counter; after the soft retry budget is exhausted we
replace the message body with an explicit "stop trying to bypass
the policy" instruction (see #3493 for the original bypass-loop
that motivated PR #3493's hard-abort, and #3599 / #3605 for why
the hard-abort backfired).
"""
if self._is_ssrf_violation(raw_text):
logger.warning(
"Tool {} blocked by SSRF guard; aborting turn: {}",
tool_call.name,
raw_text.replace("\n", " ").strip()[:200],
)
event["detail"] = ("workspace_violation: "
+ raw_text.replace("\n", " ").strip())[:160]
return ssrf_payload, event, ssrf_error
if self._is_workspace_violation(raw_text):
escalation = repeated_workspace_violation_error(
tool_call.name,
tool_call.arguments,
workspace_violation_counts,
)
event["detail"] = ("workspace_violation: "
+ raw_text.replace("\n", " ").strip())[:160]
if escalation is not None:
logger.warning(
"Tool {} hit workspace boundary repeatedly; escalating hint",
tool_call.name,
)
event["detail"] = ("workspace_violation_escalated: "
+ raw_text.replace("\n", " ").strip())[:160]
return escalation, event, None
return soft_payload, event, None
return None
async def _emit_checkpoint( async def _emit_checkpoint(
self, self,

View File

@ -14,6 +14,13 @@ from nanobot.utils.helpers import build_image_content_blocks, detect_image_mime
from nanobot.config.paths import get_media_dir from nanobot.config.paths import get_media_dir
_FS_WORKSPACE_BOUNDARY_NOTE = (
" (this is a hard policy boundary, not a transient failure; "
"do not retry with shell tricks or alternative tools, and ask "
"the user how to proceed if the resource is genuinely required)"
)
def _resolve_path( def _resolve_path(
path: str, path: str,
workspace: Path | None = None, workspace: Path | None = None,
@ -29,7 +36,10 @@ def _resolve_path(
media_path = get_media_dir().resolve() media_path = get_media_dir().resolve()
all_dirs = [allowed_dir] + [media_path] + (extra_allowed_dirs or []) all_dirs = [allowed_dir] + [media_path] + (extra_allowed_dirs or [])
if not any(_is_under(resolved, d) for d in all_dirs): if not any(_is_under(resolved, d) for d in all_dirs):
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") raise PermissionError(
f"Path {path} is outside allowed directory {allowed_dir}"
+ _FS_WORKSPACE_BOUNDARY_NOTE
)
return resolved return resolved

View File

@ -19,6 +19,20 @@ from nanobot.config.paths import get_media_dir
_IS_WINDOWS = sys.platform == "win32" _IS_WINDOWS = sys.platform == "win32"
# Appended to every workspace / safety guard rejection so the LLM is told
# explicitly that this is a policy boundary (not a transient failure) and
# that bypass loops will not change the answer. The throttle in
# ``nanobot.utils.runtime.repeated_workspace_violation_error`` upgrades
# this further once the model keeps targeting the same path.
_WORKSPACE_BOUNDARY_NOTE = (
"\n\nNote: this is a hard policy boundary, not a transient failure. "
"Do NOT retry with shell tricks (symlinks, base64 piping, alternative "
"tools, working_dir overrides). If the user genuinely needs this "
"resource, tell them you cannot reach it under the current "
"restrict_to_workspace policy and ask how to proceed."
)
@tool_parameters( @tool_parameters(
tool_parameters_schema( tool_parameters_schema(
command=StringSchema("The shell command to execute"), command=StringSchema("The shell command to execute"),
@ -129,9 +143,15 @@ class ExecTool(Tool):
requested = Path(cwd).expanduser().resolve() requested = Path(cwd).expanduser().resolve()
workspace_root = Path(self.working_dir).expanduser().resolve() workspace_root = Path(self.working_dir).expanduser().resolve()
except Exception: except Exception:
return "Error: working_dir could not be resolved" return (
"Error: working_dir could not be resolved"
+ _WORKSPACE_BOUNDARY_NOTE
)
if requested != workspace_root and workspace_root not in requested.parents: if requested != workspace_root and workspace_root not in requested.parents:
return "Error: working_dir is outside the configured workspace" return (
"Error: working_dir is outside the configured workspace"
+ _WORKSPACE_BOUNDARY_NOTE
)
guard_error = self._guard_command(command, cwd) guard_error = self._guard_command(command, cwd)
if guard_error: if guard_error:
@ -305,11 +325,18 @@ class ExecTool(Tool):
from nanobot.security.network import contains_internal_url from nanobot.security.network import contains_internal_url
if contains_internal_url(cmd): if contains_internal_url(cmd):
# SSRF: stay short and direct. The runner classifies this
# marker as a hard, non-recoverable boundary, so the
# _WORKSPACE_BOUNDARY_NOTE policy text doesn't apply here --
# we don't want the model to retry at all.
return "Error: Command blocked by safety guard (internal/private URL detected)" return "Error: Command blocked by safety guard (internal/private URL detected)"
if self.restrict_to_workspace: if self.restrict_to_workspace:
if "..\\" in cmd or "../" in cmd: if "..\\" in cmd or "../" in cmd:
return "Error: Command blocked by safety guard (path traversal detected)" return (
"Error: Command blocked by safety guard (path traversal detected)"
+ _WORKSPACE_BOUNDARY_NOTE
)
cwd_path = Path(cwd).resolve() cwd_path = Path(cwd).resolve()
@ -335,7 +362,10 @@ class ExecTool(Tool):
and media_path not in p.parents and media_path not in p.parents
and p != media_path and p != media_path
): ):
return "Error: Command blocked by safety guard (path outside working dir)" return (
"Error: Command blocked by safety guard (path outside working dir)"
+ _WORKSPACE_BOUNDARY_NOTE
)
return None return None

View File

@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
import re
from pathlib import Path
from typing import Any from typing import Any
from loguru import logger from loguru import logger
@ -10,6 +12,14 @@ from nanobot.utils.helpers import stringify_text_blocks
_MAX_REPEAT_EXTERNAL_LOOKUPS = 2 _MAX_REPEAT_EXTERNAL_LOOKUPS = 2
# Workspace-violation throttle: how many times the LLM is allowed to bump
# against the same outside-workspace target *within a single turn* before the
# tool result is escalated with a hard "stop trying to bypass the policy"
# instruction. Two free attempts give the model room to e.g. read_file then
# fall back to exec without immediately escalating; the third attempt at the
# same target is treated as a clear bypass loop.
_MAX_REPEAT_WORKSPACE_VIOLATIONS = 2
EMPTY_FINAL_RESPONSE_MESSAGE = ( EMPTY_FINAL_RESPONSE_MESSAGE = (
"I completed the tool steps but couldn't produce a final answer. " "I completed the tool steps but couldn't produce a final answer. "
"Please try again or narrow the task." "Please try again or narrow the task."
@ -95,3 +105,108 @@ def repeated_external_lookup_error(
"Error: repeated external lookup blocked. " "Error: repeated external lookup blocked. "
"Use the results you already have to answer, or try a meaningfully different source." "Use the results you already have to answer, or try a meaningfully different source."
) )
# --- Workspace-violation throttle --------------------------------------------
#
# When ``restrict_to_workspace`` is on and the LLM tries to read or exec
# something outside of the workspace, we want to *tell* the model that it
# hit a hard policy boundary -- not silently abort the whole turn and not
# allow it to spin forever swapping ``read_file`` for ``exec cat`` for
# ``python -c open(...)`` (the actual loop reported in #3493). The strategy
# is two-fold:
#
# 1. Each individual guard error already includes structured instructions
# that tell the model "don't try to bypass this; ask the user for help".
# 2. We additionally count how many times the *same outside target* has
# been refused within the current turn. After two free attempts the
# third refusal swaps in a much more forceful message that quotes the
# target path and explicitly orders the model to stop and surface the
# boundary back to the user. The model is still free to do something
# else (different target, different question) -- only the bypass loop
# is interrupted.
#
# This intentionally does *not* fatal-abort the turn: max_iterations and
# the empty-final-response retries already provide the ultimate ceiling
# for runaway loops, and aborting is what produced the silent-hang bug
# in #3605 in the first place.
_OUTSIDE_PATH_PATTERN = re.compile(r"(?:^|[\s|>'\"])((?:/[^\s\"'>;|<]+)|(?:~[^\s\"'>;|<]+))")
def workspace_violation_signature(
tool_name: str,
arguments: dict[str, Any],
) -> str | None:
"""Return a stable signature for the outside-workspace target a tool tried.
The signature is shared across tool names so that the LLM cannot bypass
the throttle by switching from ``read_file`` to ``exec cat`` to
``python -c open(...)`` against the same path. Returns ``None`` when
the call has no obvious outside target (e.g. SSRF rejections, deny
pattern hits, or tools whose argument shape we don't understand).
"""
for key in ("path", "file_path", "target", "source", "destination"):
val = arguments.get(key)
if isinstance(val, str) and val.strip():
return _normalize_violation_target(val.strip())
if tool_name in {"exec", "shell"}:
cmd = str(arguments.get("command") or "").strip()
if cmd:
match = _OUTSIDE_PATH_PATTERN.search(cmd)
if match:
return _normalize_violation_target(match.group(1))
cwd = str(arguments.get("working_dir") or "").strip()
if cwd:
return _normalize_violation_target(cwd)
return None
def _normalize_violation_target(raw: str) -> str:
"""Normalize *raw* path so that equivalent spellings collide on the same key."""
try:
normalized = str(Path(raw).expanduser().resolve())
except Exception:
normalized = raw
return f"violation:{normalized}".lower()
def repeated_workspace_violation_error(
tool_name: str,
arguments: dict[str, Any],
seen_counts: dict[str, int],
) -> str | None:
"""Return an escalated error string after repeated bypass attempts.
Returns ``None`` while the LLM is still within the soft retry budget --
callers should fall back to the tool's own error message in that case.
Once the budget is exceeded, returns a hard "stop trying" instruction
that quotes the offending target. Throttle state lives in
*seen_counts* (a per-turn dict), so the budget naturally resets across
turns without persisting LLM-controlled keys.
"""
signature = workspace_violation_signature(tool_name, arguments)
if signature is None:
return None
count = seen_counts.get(signature, 0) + 1
seen_counts[signature] = count
if count <= _MAX_REPEAT_WORKSPACE_VIOLATIONS:
return None
logger.warning(
"Escalating repeated workspace bypass attempt {} (attempt {})",
signature[:160],
count,
)
target = signature.split("violation:", 1)[1] if "violation:" in signature else signature
return (
"Error: refusing repeated workspace-bypass attempts.\n"
f"You have tried to access '{target}' (or an equivalent path) "
f"{count} times in this turn. This is a hard policy boundary -- "
"switching tools, shell tricks, working_dir overrides, symlinks, "
"or base64 piping will NOT change the answer. Stop retrying. "
"If the user genuinely needs this resource, tell them you cannot "
"access it and ask how they want to proceed (e.g. copy the file "
"into the workspace, or disable restrict_to_workspace for this run)."
)

View File

@ -313,21 +313,33 @@ async def test_runner_returns_structured_tool_error():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_runner_stops_on_workspace_violation_without_fail_on_tool_error(): async def test_runner_does_not_abort_on_workspace_violation_anymore():
"""v2 behavior: workspace-bound rejections are *soft* tool errors.
Previously (PR #3493) any workspace boundary error became a fatal
RuntimeError that aborted the turn. That silently killed legitimate
workspace commands once the heuristic guard misfired (#3599 #3605), so
we now hand the error back to the LLM as a recoverable tool result and
rely on ``repeated_workspace_violation_error`` to throttle bypass loops.
"""
from nanobot.agent.runner import AgentRunSpec, AgentRunner from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock() provider = MagicMock()
provider.chat_with_retry = AsyncMock(side_effect=[ provider.chat_with_retry = AsyncMock(side_effect=[
LLMResponse( LLMResponse(
content="working", content="trying outside",
tool_calls=[ToolCallRequest(id="call_1", name="read_file", arguments={"path": "/tmp/outside.md"})], tool_calls=[ToolCallRequest(
id="call_1", name="read_file", arguments={"path": "/tmp/outside.md"},
)],
), ),
LLMResponse(content="should not continue", tool_calls=[]), LLMResponse(content="ok, telling the user instead", tool_calls=[]),
]) ])
tools = MagicMock() tools = MagicMock()
tools.get_definitions.return_value = [] tools.get_definitions.return_value = []
tools.execute = AsyncMock( tools.execute = AsyncMock(
side_effect=PermissionError("Path /tmp/outside.md is outside allowed directory /workspace") side_effect=PermissionError(
"Path /tmp/outside.md is outside allowed directory /workspace"
)
) )
runner = AgentRunner(provider) runner = AgentRunner(provider)
@ -336,71 +348,92 @@ async def test_runner_stops_on_workspace_violation_without_fail_on_tool_error():
initial_messages=[], initial_messages=[],
tools=tools, tools=tools,
model="test-model", model="test-model",
max_iterations=2, max_iterations=3,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
)) ))
assert provider.chat_with_retry.await_count == 1 assert provider.chat_with_retry.await_count == 2, (
assert result.stop_reason == "tool_error" "workspace violation must NOT short-circuit the loop"
assert "outside allowed directory" in (result.error or "") )
assert result.tool_events == [ assert result.stop_reason != "tool_error"
{ assert result.error is None
"name": "read_file", assert result.final_content == "ok, telling the user instead"
"status": "error", assert result.tool_events and result.tool_events[0]["status"] == "error"
"detail": "workspace_violation: Path /tmp/outside.md is outside allowed directory /workspace", # Detail still carries the workspace_violation breadcrumb for telemetry,
} # but the runner did not raise.
] assert "workspace_violation" in result.tool_events[0]["detail"]
def test_is_workspace_violation_recognizes_ssrf_block(): def test_is_ssrf_violation_remains_fatal():
"""Internal/private URL block must be classified as a fatal workspace violation. """SSRF rejections are the only marker that stays turn-fatal.
Regression guard: the deny/allowlist filter messages were intentionally split A single successful internal-URL fetch can leak cloud metadata, so we
out of `_WORKSPACE_BLOCK_MARKERS` so the LLM can retry, but SSRF rejections never let the LLM "retry" with a different URL phrasing -- contrast
are a hard security boundary and must remain fatal. this with workspace-bound rejections which are soft + throttled in v2.
""" """
from nanobot.agent.runner import AgentRunner from nanobot.agent.runner import AgentRunner
ssrf_msg = "Error: Command blocked by safety guard (internal/private URL detected)" ssrf_msg = "Error: Command blocked by safety guard (internal/private URL detected)"
assert AgentRunner._is_workspace_violation(ssrf_msg) is True assert AgentRunner._is_ssrf_violation(ssrf_msg) is True
# Sanity: deny/allowlist filter messages are deliberately *not* fatal. # Workspace-bound markers are NOT classified as SSRF.
assert AgentRunner._is_workspace_violation( assert AgentRunner._is_ssrf_violation(
"Error: Command blocked by deny pattern filter"
) is False
assert AgentRunner._is_workspace_violation(
"Error: Command blocked by allowlist filter (not in allowlist)"
) is False
def test_is_workspace_violation_does_not_fatal_on_shell_guard_heuristics():
"""#3599 / #3605 regression: shell guard heuristics must NOT be fatal.
``path outside working dir`` and ``path traversal detected`` are produced
by best-effort string scans inside ``ExecTool._guard_command`` -- they
routinely false-positive on idiomatic constructs (``2>/dev/null``,
``sed 's|x|../y|g'``) and should be surfaced to the LLM as recoverable
tool errors so it can switch tactics, not abort the whole turn.
"""
from nanobot.agent.runner import AgentRunner
assert AgentRunner._is_workspace_violation(
"Error: Command blocked by safety guard (path outside working dir)" "Error: Command blocked by safety guard (path outside working dir)"
) is False ) is False
assert AgentRunner._is_workspace_violation( assert AgentRunner._is_ssrf_violation(
"Error: Command blocked by safety guard (path traversal detected)" "Path /tmp/x is outside allowed directory /ws"
) is False
# Deny / allowlist filter messages stay non-fatal too.
assert AgentRunner._is_ssrf_violation(
"Error: Command blocked by deny pattern filter"
) is False ) is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_runner_lets_llm_recover_from_shell_guard_path_outside(): async def test_runner_aborts_on_ssrf_violation():
"""End-to-end: a guard-blocked exec is a soft tool error, not a turn-fatal. """SSRF still fatal-aborts the turn even though workspace ones are soft."""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
Reporter scenario: a previous PR turned ``path outside working dir`` into provider = MagicMock()
a turn-fatal RuntimeError, so when the false-positive guard fired the provider.chat_with_retry = AsyncMock(side_effect=[
user got no further iterations and (depending on channel) a silent hang. LLMResponse(
After narrowing the marker list, the runner must hand the error back to content="curl-ing metadata",
the LLM and let the next iteration succeed normally. tool_calls=[ToolCallRequest(
id="call_ssrf",
name="exec",
arguments={"command": "curl http://169.254.169.254"},
)],
),
LLMResponse(content="should NOT be reached", tool_calls=[]),
])
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(return_value=(
"Error: Command blocked by safety guard (internal/private URL detected)"
))
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[],
tools=tools,
model="test-model",
max_iterations=3,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert provider.chat_with_retry.await_count == 1, "SSRF must abort immediately"
assert result.stop_reason == "tool_error"
assert "internal/private url detected" in (result.error or "").lower()
@pytest.mark.asyncio
async def test_runner_lets_llm_recover_from_shell_guard_path_outside():
"""Reporter scenario for #3599 / #3605 -- guard hit, agent recovers.
The shell `_guard_command` heuristic fires on `2>/dev/null`-style
redirects and other shell idioms. Before v2 that abort'd the whole
turn (silent hang on Telegram per #3605); now the LLM gets the soft
error back and can finalize on the next iteration.
""" """
from nanobot.agent.runner import AgentRunSpec, AgentRunner from nanobot.agent.runner import AgentRunSpec, AgentRunner
@ -443,7 +476,66 @@ async def test_runner_lets_llm_recover_from_shell_guard_path_outside():
assert result.error is None assert result.error is None
assert result.final_content == "recovered final answer" assert result.final_content == "recovered final answer"
assert result.tool_events and result.tool_events[0]["status"] == "error" assert result.tool_events and result.tool_events[0]["status"] == "error"
assert "workspace_violation" not in result.tool_events[0]["detail"] # v2: detail keeps the breadcrumb but the runner did not raise.
assert "workspace_violation" in result.tool_events[0]["detail"]
@pytest.mark.asyncio
async def test_runner_throttles_repeated_workspace_bypass_attempts():
"""#3493 motivation: stop the LLM bypass loop without aborting the turn.
LLM keeps switching tools (read_file -> exec cat -> python -c open(...))
against the same outside path. After the soft retry budget is exhausted
the runner replaces the tool result with a hard "stop trying" message
so the model finally gives up and surfaces the boundary to the user.
"""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
bypass_attempts = [
ToolCallRequest(
id=f"a{i}", name="exec",
arguments={"command": f"cat /Users/x/Downloads/01.md # try {i}"},
)
for i in range(4)
]
responses: list[LLMResponse] = [
LLMResponse(content=f"try {i}", tool_calls=[bypass_attempts[i]])
for i in range(4)
]
responses.append(LLMResponse(content="ok telling user", tool_calls=[]))
provider = MagicMock()
provider.chat_with_retry = AsyncMock(side_effect=responses)
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(
return_value="Error: Command blocked by safety guard (path outside working dir)"
)
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[],
tools=tools,
model="test-model",
max_iterations=10,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
# All 4 bypass attempts surface to the LLM (no fatal abort), and the
# runner finally completes once the LLM stops asking.
assert result.stop_reason != "tool_error"
assert result.error is None
assert result.final_content == "ok telling user"
# The third+ attempts must have been escalated -- look at the events.
escalated = [
ev for ev in result.tool_events
if ev["status"] == "error"
and ev["detail"].startswith("workspace_violation_escalated:")
]
assert escalated, (
"expected at least one escalated workspace_violation event, got: "
f"{result.tool_events}"
)
@pytest.mark.asyncio @pytest.mark.asyncio
@ -924,6 +1016,7 @@ async def test_runner_batches_read_only_tools_before_exclusive_work():
ToolCallRequest(id="rw1", name="write_a", arguments={}), ToolCallRequest(id="rw1", name="write_a", arguments={}),
], ],
{}, {},
{},
) )
assert shared_events[0:2] == ["start:read_a", "start:read_b"] assert shared_events[0:2] == ["start:read_a", "start:read_b"]
@ -968,6 +1061,7 @@ async def test_runner_does_not_batch_exclusive_read_only_tools():
ToolCallRequest(id="ro2", name="read_b", arguments={}), ToolCallRequest(id="ro2", name="read_b", arguments={}),
], ],
{}, {},
{},
) )
assert shared_events[0] == "start:read_a" assert shared_events[0] == "start:read_a"

View File

@ -242,13 +242,21 @@ def test_exec_extract_absolute_paths_captures_quoted_paths() -> None:
def test_exec_guard_blocks_home_path_outside_workspace(tmp_path) -> None: def test_exec_guard_blocks_home_path_outside_workspace(tmp_path) -> None:
tool = ExecTool(restrict_to_workspace=True) tool = ExecTool(restrict_to_workspace=True)
error = tool._guard_command("cat ~/.nanobot/config.json", str(tmp_path)) error = tool._guard_command("cat ~/.nanobot/config.json", str(tmp_path))
assert error == "Error: Command blocked by safety guard (path outside working dir)" assert error is not None
assert error.startswith(
"Error: Command blocked by safety guard (path outside working dir)"
)
assert "hard policy boundary" in error
def test_exec_guard_blocks_quoted_home_path_outside_workspace(tmp_path) -> None: def test_exec_guard_blocks_quoted_home_path_outside_workspace(tmp_path) -> None:
tool = ExecTool(restrict_to_workspace=True) tool = ExecTool(restrict_to_workspace=True)
error = tool._guard_command('cat "~/.nanobot/config.json"', str(tmp_path)) error = tool._guard_command('cat "~/.nanobot/config.json"', str(tmp_path))
assert error == "Error: Command blocked by safety guard (path outside working dir)" assert error is not None
assert error.startswith(
"Error: Command blocked by safety guard (path outside working dir)"
)
assert "hard policy boundary" in error
def test_exec_guard_allows_media_path_outside_workspace(tmp_path, monkeypatch) -> None: def test_exec_guard_allows_media_path_outside_workspace(tmp_path, monkeypatch) -> None:
@ -300,7 +308,11 @@ def test_exec_guard_blocks_windows_drive_root_outside_workspace(monkeypatch) ->
tool = ExecTool(restrict_to_workspace=True) tool = ExecTool(restrict_to_workspace=True)
error = tool._guard_command("dir E:\\", "E:\\workspace") error = tool._guard_command("dir E:\\", "E:\\workspace")
assert error == "Error: Command blocked by safety guard (path outside working dir)" assert error is not None
assert error.startswith(
"Error: Command blocked by safety guard (path outside working dir)"
)
assert "hard policy boundary" in error
# --- cast_params tests --- # --- cast_params tests ---

View File

@ -0,0 +1,120 @@
"""Tests for repeated_workspace_violation throttle and signature."""
from __future__ import annotations
from nanobot.utils.runtime import (
repeated_workspace_violation_error,
workspace_violation_signature,
)
def test_signature_for_filesystem_tools_uses_path_argument():
sig_a = workspace_violation_signature(
"read_file", {"path": "/Users/x/Downloads/01.md"}
)
sig_b = workspace_violation_signature(
"write_file", {"path": "/Users/x/Downloads/01.md"}
)
sig_c = workspace_violation_signature(
"edit_file", {"file_path": "/Users/x/Downloads/01.md"}
)
assert sig_a is not None
assert sig_a == sig_b == sig_c, (
"the throttle must collapse equivalent paths across different tools "
"so the LLM cannot bypass it by switching tool"
)
assert "/users/x/downloads/01.md" in sig_a
def test_signature_for_exec_extracts_first_absolute_path_in_command():
sig = workspace_violation_signature(
"exec",
{"command": "cat /Users/x/Downloads/01.md && echo done"},
)
assert sig is not None
assert "/users/x/downloads/01.md" in sig
def test_signature_collides_across_filesystem_and_exec_for_same_target():
"""LLM bypass loops jump tools (read_file -> exec cat). Throttle must
treat both attempts as targeting the same outside resource."""
fs_sig = workspace_violation_signature(
"read_file", {"path": "/Users/x/Downloads/01.md"}
)
exec_sig = workspace_violation_signature(
"exec", {"command": "cat /Users/x/Downloads/01.md"}
)
assert fs_sig == exec_sig
def test_signature_falls_back_to_working_dir_when_no_absolute_in_command():
sig = workspace_violation_signature(
"exec",
{"command": "ls -la", "working_dir": "/etc"},
)
assert sig is not None
assert "/etc" in sig
def test_signature_is_none_for_unknown_tool_with_no_path():
assert workspace_violation_signature("web_search", {"query": "anything"}) is None
assert workspace_violation_signature("exec", {"command": "echo hello"}) is None
def test_repeated_workspace_violation_returns_none_within_budget():
counts: dict[str, int] = {}
arguments = {"path": "/Users/x/Downloads/01.md"}
assert repeated_workspace_violation_error("read_file", arguments, counts) is None
assert repeated_workspace_violation_error("read_file", arguments, counts) is None
def test_repeated_workspace_violation_escalates_after_third_attempt():
counts: dict[str, int] = {}
arguments = {"path": "/Users/x/Downloads/01.md"}
repeated_workspace_violation_error("read_file", arguments, counts)
repeated_workspace_violation_error("read_file", arguments, counts)
third = repeated_workspace_violation_error("read_file", arguments, counts)
assert third is not None
assert "refusing repeated workspace-bypass" in third
assert "/users/x/downloads/01.md" in third
assert "ask how they want to proceed" in third
def test_repeated_workspace_violation_independent_per_target():
"""Different outside paths must each get their own retry budget."""
counts: dict[str, int] = {}
repeated_workspace_violation_error(
"read_file", {"path": "/Users/x/Downloads/01.md"}, counts,
)
repeated_workspace_violation_error(
"read_file", {"path": "/Users/x/Downloads/01.md"}, counts,
)
# Different target, fresh budget.
assert repeated_workspace_violation_error(
"read_file", {"path": "/Users/x/Documents/notes.md"}, counts,
) is None
def test_repeated_workspace_violation_collapses_tool_switching():
"""LLM switches from read_file to exec cat then to python -c open(...)
against the same path; the throttle must escalate on the third attempt."""
counts: dict[str, int] = {}
repeated_workspace_violation_error(
"read_file", {"path": "/Users/x/Downloads/01.md"}, counts,
)
repeated_workspace_violation_error(
"exec", {"command": "cat /Users/x/Downloads/01.md"}, counts,
)
third = repeated_workspace_violation_error(
"exec",
{"command": "python3 -c \"open('/Users/x/Downloads/01.md').read()\""},
counts,
)
assert third is not None
assert "refusing repeated workspace-bypass" in third