nanobot/tests/agent/test_runner_safety.py
chengyongru 99cc6ee808 test(agent): expand coverage and refactor test structure
- Add 42 tests for ContextBuilder (context.py: 0→42 tests)
- Add 37 tests for SubagentManager lifecycle (subagent.py: 2→37 tests)
- Add 42 unit tests for AutoCompact in isolation
- Split monolithic test_runner.py (3313 lines) into 9 focused files:
  test_runner_core, test_runner_hooks, test_runner_errors,
  test_runner_safety, test_runner_persistence, test_runner_governance,
  test_runner_tool_execution, test_runner_injections,
  test_loop_runner_integration
- Add 3 config passthrough tests (temperature/max_tokens/reasoning_effort)
- Fix fragile patch.object(__init__) in test_stop_preserves_context
- Create shared conftest.py with make_provider/make_loop factories

Total: 934 tests passing, 0 regressions
2026-05-13 12:49:17 +08:00

245 lines
9.2 KiB
Python

"""Tests for AgentRunner security: workspace violations, SSRF, shell guard, throttling."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from nanobot.config.schema import AgentDefaults
from nanobot.providers.base import LLMResponse, ToolCallRequest
_MAX_TOOL_RESULT_CHARS = AgentDefaults().max_tool_result_chars
async def test_runner_does_not_abort_on_workspace_violation_anymore():
"""v2 behavior: workspace-bound rejections are *soft* tool errors.
Previously (PR #3493) any workspace boundary error became a fatal
RuntimeError that aborted the turn. That silently killed legitimate
workspace commands once the heuristic guard misfired (#3599 #3605), so
we now hand the error back to the LLM as a recoverable tool result and
rely on ``repeated_workspace_violation_error`` to throttle bypass loops.
"""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
provider.chat_with_retry = AsyncMock(side_effect=[
LLMResponse(
content="trying outside",
tool_calls=[ToolCallRequest(
id="call_1", name="read_file", arguments={"path": "/tmp/outside.md"},
)],
),
LLMResponse(content="ok, telling the user instead", tool_calls=[]),
])
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(
side_effect=PermissionError(
"Path /tmp/outside.md is outside allowed directory /workspace"
)
)
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[],
tools=tools,
model="test-model",
max_iterations=3,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert provider.chat_with_retry.await_count == 2, (
"workspace violation must NOT short-circuit the loop"
)
assert result.stop_reason != "tool_error"
assert result.error is None
assert result.final_content == "ok, telling the user instead"
assert result.tool_events and result.tool_events[0]["status"] == "error"
# Detail still carries the workspace_violation breadcrumb for telemetry,
# but the runner did not raise.
assert "workspace_violation" in result.tool_events[0]["detail"]
def test_is_ssrf_violation_recognizes_private_url_blocks():
"""SSRF rejections are classified separately from workspace boundaries."""
from nanobot.agent.runner import AgentRunner
ssrf_msg = "Error: Command blocked by safety guard (internal/private URL detected)"
assert AgentRunner._is_ssrf_violation(ssrf_msg) is True
assert AgentRunner._is_ssrf_violation(
"URL validation failed: Blocked: host resolves to private/internal address 192.168.1.2"
) is True
# Workspace-bound markers are NOT classified as SSRF.
assert AgentRunner._is_ssrf_violation(
"Error: Command blocked by safety guard (path outside working dir)"
) is False
assert AgentRunner._is_ssrf_violation(
"Path /tmp/x is outside allowed directory /ws"
) is False
# Deny / allowlist filter messages stay non-fatal too.
assert AgentRunner._is_ssrf_violation(
"Error: Command blocked by deny pattern filter"
) is False
@pytest.mark.asyncio
async def test_runner_returns_non_retryable_hint_on_ssrf_violation():
"""SSRF stays blocked, but the runtime gives the LLM a final chance to recover."""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
provider.chat_with_retry = AsyncMock(side_effect=[
LLMResponse(
content="curl-ing metadata",
tool_calls=[ToolCallRequest(
id="call_ssrf",
name="exec",
arguments={"command": "curl http://169.254.169.254"},
)],
),
LLMResponse(
content="I cannot access that private URL. Please share local files.",
tool_calls=[],
),
])
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(return_value=(
"Error: Command blocked by safety guard (internal/private URL detected)"
))
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[],
tools=tools,
model="test-model",
max_iterations=3,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert provider.chat_with_retry.await_count == 2
assert result.stop_reason == "completed"
assert result.error is None
assert result.final_content == "I cannot access that private URL. Please share local files."
assert result.tool_events and result.tool_events[0]["detail"].startswith("ssrf_violation:")
tool_messages = [m for m in result.messages if m.get("role") == "tool"]
assert tool_messages
assert "non-bypassable security boundary" in tool_messages[0]["content"]
assert "Do not retry" in tool_messages[0]["content"]
assert "tools.ssrfWhitelist" in tool_messages[0]["content"]
@pytest.mark.asyncio
async def test_runner_lets_llm_recover_from_shell_guard_path_outside():
"""Reporter scenario for #3599 / #3605 -- guard hit, agent recovers.
The shell `_guard_command` heuristic fires on `2>/dev/null`-style
redirects and other shell idioms. Before v2 that abort'd the whole
turn (silent hang on Telegram per #3605); now the LLM gets the soft
error back and can finalize on the next iteration.
"""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
captured_second_call: list[dict] = []
async def chat_with_retry(*, messages, **kwargs):
if provider.chat_with_retry.await_count == 1:
return LLMResponse(
content="trying noisy cleanup",
tool_calls=[ToolCallRequest(
id="call_blocked",
name="exec",
arguments={"command": "rm scratch.txt 2>/dev/null"},
)],
)
captured_second_call[:] = list(messages)
return LLMResponse(content="recovered final answer", tool_calls=[])
provider.chat_with_retry = AsyncMock(side_effect=chat_with_retry)
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(
return_value="Error: Command blocked by safety guard (path outside working dir)"
)
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[],
tools=tools,
model="test-model",
max_iterations=3,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert provider.chat_with_retry.await_count == 2, (
"guard hit must NOT short-circuit the loop -- LLM should get a second turn"
)
assert result.stop_reason != "tool_error"
assert result.error is None
assert result.final_content == "recovered final answer"
assert result.tool_events and result.tool_events[0]["status"] == "error"
# v2: detail keeps the breadcrumb but the runner did not raise.
assert "workspace_violation" in result.tool_events[0]["detail"]
@pytest.mark.asyncio
async def test_runner_throttles_repeated_workspace_bypass_attempts():
"""#3493 motivation: stop the LLM bypass loop without aborting the turn.
LLM keeps switching tools (read_file -> exec cat -> python -c open(...))
against the same outside path. After the soft retry budget is exhausted
the runner replaces the tool result with a hard "stop trying" message
so the model finally gives up and surfaces the boundary to the user.
"""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
bypass_attempts = [
ToolCallRequest(
id=f"a{i}", name="exec",
arguments={"command": f"cat /Users/x/Downloads/01.md # try {i}"},
)
for i in range(4)
]
responses: list[LLMResponse] = [
LLMResponse(content=f"try {i}", tool_calls=[bypass_attempts[i]])
for i in range(4)
]
responses.append(LLMResponse(content="ok telling user", tool_calls=[]))
provider = MagicMock()
provider.chat_with_retry = AsyncMock(side_effect=responses)
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(
return_value="Error: Command blocked by safety guard (path outside working dir)"
)
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[],
tools=tools,
model="test-model",
max_iterations=10,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
# All 4 bypass attempts surface to the LLM (no fatal abort), and the
# runner finally completes once the LLM stops asking.
assert result.stop_reason != "tool_error"
assert result.error is None
assert result.final_content == "ok telling user"
# The third+ attempts must have been escalated -- look at the events.
escalated = [
ev for ev in result.tool_events
if ev["status"] == "error"
and ev["detail"].startswith("workspace_violation_escalated:")
]
assert escalated, (
"expected at least one escalated workspace_violation event, got: "
f"{result.tool_events}"
)