diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index c2e15bf8a..b9418045e 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -776,8 +776,6 @@ class AgentRunner: handled = self._classify_violation( raw_text=prep_error, soft_payload=prep_error + hint, - ssrf_payload=prep_error, - ssrf_error=RuntimeError(prep_error), event=event, tool_call=tool_call, workspace_violation_counts=workspace_violation_counts, @@ -808,8 +806,6 @@ class AgentRunner: raw_text=str(exc), # Preserve legacy exception payloads without the retry hint. soft_payload=payload, - ssrf_payload=payload, - ssrf_error=exc, event=event, tool_call=tool_call, workspace_violation_counts=workspace_violation_counts, @@ -829,8 +825,6 @@ class AgentRunner: handled = self._classify_violation( raw_text=result, soft_payload=result + hint, - ssrf_payload=result, - ssrf_error=RuntimeError(result), event=event, tool_call=tool_call, workspace_violation_counts=workspace_violation_counts, @@ -849,8 +843,21 @@ class AgentRunner: detail = detail[:120] + "..." return result, {"name": tool_call.name, "status": "ok", "detail": detail}, None - # SSRF remains fatal; workspace path boundaries are soft + throttled. - _SSRF_MARKER: str = "internal/private url detected" + # SSRF is a hard security block at the tool boundary, but the agent turn + # should recover conversationally instead of aborting the runtime. + _SSRF_MARKERS: tuple[str, ...] = ( + "internal/private url detected", + "private/internal address", + "private address", + ) + _SSRF_BOUNDARY_NOTE: str = ( + "This is a non-bypassable security boundary. Stop trying to access " + "private/internal URLs. Do not retry with curl, wget, encoded IPs, " + "alternate DNS, redirects, proxies, or another tool. Ask the user for " + "local files, logs, screenshots, or an explicit safe public URL instead. " + "If the user explicitly trusts this private URL, ask them to whitelist " + "the exact IP/CIDR via tools.ssrfWhitelist." + ) # Non-SSRF boundary markers returned to the LLM as recoverable tool errors. _WORKSPACE_VIOLATION_MARKERS: tuple[str, ...] = ( @@ -864,7 +871,10 @@ class AgentRunner: @classmethod def _is_ssrf_violation(cls, text: str) -> bool: - return bool(text) and cls._SSRF_MARKER in text.lower() + if not text: + return False + lowered = text.lower() + return any(marker in lowered for marker in cls._SSRF_MARKERS) @classmethod def _is_workspace_violation(cls, text: str) -> bool: @@ -872,7 +882,7 @@ class AgentRunner: if not text: return False lowered = text.lower() - if cls._SSRF_MARKER in lowered: + if cls._is_ssrf_violation(lowered): return True return any(marker in lowered for marker in cls._WORKSPACE_VIOLATION_MARKERS) @@ -881,8 +891,6 @@ class AgentRunner: *, raw_text: str, soft_payload: str, - ssrf_payload: str, - ssrf_error: BaseException, event: dict[str, str], tool_call: ToolCallRequest, workspace_violation_counts: dict[str, int], @@ -890,12 +898,12 @@ class AgentRunner: """Classify safety-boundary failures, or return ``None`` to pass through.""" if self._is_ssrf_violation(raw_text): logger.warning( - "Tool {} blocked by SSRF guard; aborting turn: {}", + "Tool {} blocked by SSRF guard; returning non-retryable tool error: {}", tool_call.name, raw_text.replace("\n", " ").strip()[:200], ) - event["detail"] = self._event_detail("workspace_violation: ", raw_text) - return ssrf_payload, event, ssrf_error + event["detail"] = self._event_detail("ssrf_violation: ", raw_text) + return self._ssrf_soft_payload(raw_text), event, None if self._is_workspace_violation(raw_text): escalation = repeated_workspace_violation_error( @@ -918,6 +926,11 @@ class AgentRunner: return None + @classmethod + def _ssrf_soft_payload(cls, raw_text: str) -> str: + text = raw_text.strip() or "Error: request blocked by SSRF guard" + return f"{text}\n\n{cls._SSRF_BOUNDARY_NOTE}" + @staticmethod def _event_detail(prefix: str, text: str, limit: int = 160) -> str: return (prefix + text.replace("\n", " ").strip())[:limit] diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index 17451432a..44767e97a 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -321,7 +321,7 @@ class ExecTool(Tool): from nanobot.security.network import contains_internal_url if contains_internal_url(cmd): - # SSRF stays fatal in the runner, so keep this marker direct. + # The runner turns this marker into a non-retryable security hint. return "Error: Command blocked by safety guard (internal/private URL detected)" if self.restrict_to_workspace: diff --git a/tests/agent/test_runner.py b/tests/agent/test_runner.py index 27ee2b065..0be615cb9 100644 --- a/tests/agent/test_runner.py +++ b/tests/agent/test_runner.py @@ -364,17 +364,15 @@ async def test_runner_does_not_abort_on_workspace_violation_anymore(): assert "workspace_violation" in result.tool_events[0]["detail"] -def test_is_ssrf_violation_remains_fatal(): - """SSRF rejections are the only marker that stays turn-fatal. - - A single successful internal-URL fetch can leak cloud metadata, so we - never let the LLM "retry" with a different URL phrasing -- contrast - this with workspace-bound rejections which are soft + throttled in v2. - """ +def test_is_ssrf_violation_recognizes_private_url_blocks(): + """SSRF rejections are classified separately from workspace boundaries.""" from nanobot.agent.runner import AgentRunner ssrf_msg = "Error: Command blocked by safety guard (internal/private URL detected)" assert AgentRunner._is_ssrf_violation(ssrf_msg) is True + assert AgentRunner._is_ssrf_violation( + "URL validation failed: Blocked: host resolves to private/internal address 192.168.1.2" + ) is True # Workspace-bound markers are NOT classified as SSRF. assert AgentRunner._is_ssrf_violation( @@ -390,8 +388,8 @@ def test_is_ssrf_violation_remains_fatal(): @pytest.mark.asyncio -async def test_runner_aborts_on_ssrf_violation(): - """SSRF still fatal-aborts the turn even though workspace ones are soft.""" +async def test_runner_returns_non_retryable_hint_on_ssrf_violation(): + """SSRF stays blocked, but the runtime gives the LLM a final chance to recover.""" from nanobot.agent.runner import AgentRunSpec, AgentRunner provider = MagicMock() @@ -404,7 +402,10 @@ async def test_runner_aborts_on_ssrf_violation(): arguments={"command": "curl http://169.254.169.254"}, )], ), - LLMResponse(content="should NOT be reached", tool_calls=[]), + LLMResponse( + content="I cannot access that private URL. Please share local files.", + tool_calls=[], + ), ]) tools = MagicMock() tools.get_definitions.return_value = [] @@ -421,9 +422,16 @@ async def test_runner_aborts_on_ssrf_violation(): max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, )) - assert provider.chat_with_retry.await_count == 1, "SSRF must abort immediately" - assert result.stop_reason == "tool_error" - assert "internal/private url detected" in (result.error or "").lower() + assert provider.chat_with_retry.await_count == 2 + assert result.stop_reason == "completed" + assert result.error is None + assert result.final_content == "I cannot access that private URL. Please share local files." + assert result.tool_events and result.tool_events[0]["detail"].startswith("ssrf_violation:") + tool_messages = [m for m in result.messages if m.get("role") == "tool"] + assert tool_messages + assert "non-bypassable security boundary" in tool_messages[0]["content"] + assert "Do not retry" in tool_messages[0]["content"] + assert "tools.ssrfWhitelist" in tool_messages[0]["content"] @pytest.mark.asyncio @@ -1290,7 +1298,7 @@ async def test_streamed_flag_not_set_on_llm_error(tmp_path): @pytest.mark.asyncio -async def test_streamed_flag_not_set_on_tool_error(tmp_path): +async def test_ssrf_soft_block_can_finalize_after_streamed_tool_call(tmp_path): from nanobot.agent.loop import AgentLoop from nanobot.bus.events import InboundMessage from nanobot.bus.queue import MessageBus @@ -1307,7 +1315,14 @@ async def test_streamed_flag_not_set_on_tool_error(tmp_path): )], usage={}, ) - provider.chat_stream_with_retry = AsyncMock(return_value=tool_call_resp) + provider.chat_stream_with_retry = AsyncMock(side_effect=[ + tool_call_resp, + LLMResponse( + content="I cannot access private URLs. Please share the local file.", + tool_calls=[], + usage={}, + ), + ]) loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model") loop.tools.get_definitions = MagicMock(return_value=[]) @@ -1323,9 +1338,8 @@ async def test_streamed_flag_not_set_on_tool_error(tmp_path): ) assert result is not None - assert "internal/private URL detected" in result.content - assert not result.metadata.get("_streamed"), \ - "_streamed must not be set when stop_reason is tool_error" + assert result.content == "I cannot access private URLs. Please share the local file." + assert result.metadata.get("_streamed") is True @pytest.mark.asyncio