nanobot/nanobot/utils/runtime.py
Xubin Ren 2a7433b7ec chore(runner): tighten workspace guard comments and Windows tests
Keep the workspace-boundary changes easier to review by trimming long explanatory comments down to short local notes. Also make the #3599 POSIX command regression skip on Windows and normalize workspace violation signatures to POSIX separators so the throttle tests are platform-stable.

Tests:
- uv run pytest tests/tools/test_exec_security.py tests/utils/test_workspace_violation_throttle.py -q
- uv run pytest -q

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-04 01:18:39 +08:00

171 lines
6.0 KiB
Python

"""Runtime-specific helper functions and constants."""
from __future__ import annotations
import re
from pathlib import Path
from typing import Any
from loguru import logger
from nanobot.utils.helpers import stringify_text_blocks
_MAX_REPEAT_EXTERNAL_LOOKUPS = 2
# Third same-target workspace violation in a turn escalates to "stop retrying".
_MAX_REPEAT_WORKSPACE_VIOLATIONS = 2
EMPTY_FINAL_RESPONSE_MESSAGE = (
"I completed the tool steps but couldn't produce a final answer. "
"Please try again or narrow the task."
)
FINALIZATION_RETRY_PROMPT = (
"Please provide your response to the user based on the conversation above."
)
LENGTH_RECOVERY_PROMPT = (
"Output limit reached. Continue exactly where you left off "
"— no recap, no apology. Break remaining work into smaller steps if needed."
)
def empty_tool_result_message(tool_name: str) -> str:
"""Short prompt-safe marker for tools that completed without visible output."""
return f"({tool_name} completed with no output)"
def ensure_nonempty_tool_result(tool_name: str, content: Any) -> Any:
"""Replace semantically empty tool results with a short marker string."""
if content is None:
return empty_tool_result_message(tool_name)
if isinstance(content, str) and not content.strip():
return empty_tool_result_message(tool_name)
if isinstance(content, list):
if not content:
return empty_tool_result_message(tool_name)
text_payload = stringify_text_blocks(content)
if text_payload is not None and not text_payload.strip():
return empty_tool_result_message(tool_name)
return content
def is_blank_text(content: str | None) -> bool:
"""True when *content* is missing or only whitespace."""
return content is None or not content.strip()
def build_finalization_retry_message() -> dict[str, str]:
"""A short no-tools-allowed prompt for final answer recovery."""
return {"role": "user", "content": FINALIZATION_RETRY_PROMPT}
def build_length_recovery_message() -> dict[str, str]:
"""Prompt the model to continue after hitting output token limit."""
return {"role": "user", "content": LENGTH_RECOVERY_PROMPT}
def external_lookup_signature(tool_name: str, arguments: dict[str, Any]) -> str | None:
"""Stable signature for repeated external lookups we want to throttle."""
if tool_name == "web_fetch":
url = str(arguments.get("url") or "").strip()
if url:
return f"web_fetch:{url.lower()}"
if tool_name == "web_search":
query = str(arguments.get("query") or arguments.get("search_term") or "").strip()
if query:
return f"web_search:{query.lower()}"
return None
def repeated_external_lookup_error(
tool_name: str,
arguments: dict[str, Any],
seen_counts: dict[str, int],
) -> str | None:
"""Block repeated external lookups after a small retry budget."""
signature = external_lookup_signature(tool_name, arguments)
if signature is None:
return None
count = seen_counts.get(signature, 0) + 1
seen_counts[signature] = count
if count <= _MAX_REPEAT_EXTERNAL_LOOKUPS:
return None
logger.warning(
"Blocking repeated external lookup {} on attempt {}",
signature[:160],
count,
)
return (
"Error: repeated external lookup blocked. "
"Use the results you already have to answer, or try a meaningfully different source."
)
# Workspace-boundary violations are soft errors, with per-target throttling.
_OUTSIDE_PATH_PATTERN = re.compile(r"(?:^|[\s|>'\"])((?:/[^\s\"'>;|<]+)|(?:~[^\s\"'>;|<]+))")
def workspace_violation_signature(
tool_name: str,
arguments: dict[str, Any],
) -> str | None:
"""Return a stable cross-tool signature for the outside-workspace target."""
for key in ("path", "file_path", "target", "source", "destination"):
val = arguments.get(key)
if isinstance(val, str) and val.strip():
return _normalize_violation_target(val.strip())
if tool_name in {"exec", "shell"}:
cmd = str(arguments.get("command") or "").strip()
if cmd:
match = _OUTSIDE_PATH_PATTERN.search(cmd)
if match:
return _normalize_violation_target(match.group(1))
cwd = str(arguments.get("working_dir") or "").strip()
if cwd:
return _normalize_violation_target(cwd)
return None
def _normalize_violation_target(raw: str) -> str:
"""Normalize *raw* path so that equivalent spellings collide on the same key."""
try:
normalized = Path(raw).expanduser().resolve().as_posix()
except Exception:
normalized = raw.replace("\\", "/")
return f"violation:{normalized}".lower()
def repeated_workspace_violation_error(
tool_name: str,
arguments: dict[str, Any],
seen_counts: dict[str, int],
) -> str | None:
"""Return an escalated error after repeated bypass attempts."""
signature = workspace_violation_signature(tool_name, arguments)
if signature is None:
return None
count = seen_counts.get(signature, 0) + 1
seen_counts[signature] = count
if count <= _MAX_REPEAT_WORKSPACE_VIOLATIONS:
return None
logger.warning(
"Escalating repeated workspace bypass attempt {} (attempt {})",
signature[:160],
count,
)
target = signature.split("violation:", 1)[1] if "violation:" in signature else signature
return (
"Error: refusing repeated workspace-bypass attempts.\n"
f"You have tried to access '{target}' (or an equivalent path) "
f"{count} times in this turn. This is a hard policy boundary -- "
"switching tools, shell tricks, working_dir overrides, symlinks, "
"or base64 piping will NOT change the answer. Stop retrying. "
"If the user genuinely needs this resource, tell them you cannot "
"access it and ask how they want to proceed (e.g. copy the file "
"into the workspace, or disable restrict_to_workspace for this run)."
)