mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-06 17:55:59 +00:00
PR #3493 promoted every shell `_guard_command` rejection to a turn-fatal RuntimeError. The two heuristic outputs in that list -- `path outside working dir` and `path traversal detected` -- routinely false-positive on benign constructs (e.g. `2>/dev/null`, quoted `..` arguments to sed/find, absolute paths inside inline scripts), so legitimate workspace commands silently kill the user's turn (#3599) and the agent never gets a chance to retry with a different approach (#3605). Two changes, both narrowly scoped: - `ExecTool._guard_command` now skips a small allow-list of kernel device files (`/dev/null`, the standard streams, `/dev/random`, `/dev/fd/N`, ...) before the workspace path check, matched against the pre-resolve string so symlinks like `/dev/stderr -> /proc/self/fd/2` still hit the allow-list. Real outside writes such as `> /etc/issue` remain blocked. - `AgentRunner._WORKSPACE_BLOCK_MARKERS` keeps only the four hard path-resolution errors from filesystem.py / shell.py and the SSRF marker. The two heuristic substrings move out of the fatal list, so the LLM sees them as ordinary tool errors and can self-correct in the next iteration. SSRF stays fatal because retrying an internal URL with a different phrasing would defeat the safety boundary. Tests: - `tests/tools/test_exec_security.py`: parametrized regression for the exact #3599 command sample plus other stdio redirects and device reads; explicit negative case asserts `> /etc/issue` is still blocked. - `tests/agent/test_runner.py`: `_is_workspace_violation` no longer fatals on the two heuristic markers, plus an end-to-end case proving the runner hands the guard error back to the LLM and finalizes the next turn cleanly.
244 lines
8.6 KiB
Python
244 lines
8.6 KiB
Python
"""Tests for exec tool internal URL blocking."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import socket
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.agent.tools.shell import ExecTool
|
|
|
|
|
|
def _fake_resolve_private(hostname, port, family=0, type_=0):
|
|
return [(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("169.254.169.254", 0))]
|
|
|
|
|
|
def _fake_resolve_localhost(hostname, port, family=0, type_=0):
|
|
return [(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0))]
|
|
|
|
|
|
def _fake_resolve_public(hostname, port, family=0, type_=0):
|
|
return [(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("93.184.216.34", 0))]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_blocks_curl_metadata():
|
|
tool = ExecTool()
|
|
with patch("nanobot.security.network.socket.getaddrinfo", _fake_resolve_private):
|
|
result = await tool.execute(
|
|
command='curl -s -H "Metadata-Flavor: Google" http://169.254.169.254/computeMetadata/v1/'
|
|
)
|
|
assert "Error" in result
|
|
assert "internal" in result.lower() or "private" in result.lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_blocks_wget_localhost():
|
|
tool = ExecTool()
|
|
with patch("nanobot.security.network.socket.getaddrinfo", _fake_resolve_localhost):
|
|
result = await tool.execute(command="wget http://localhost:8080/secret -O /tmp/out")
|
|
assert "Error" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_allows_normal_commands():
|
|
tool = ExecTool(timeout=5)
|
|
result = await tool.execute(command="echo hello")
|
|
assert "hello" in result
|
|
assert "Error" not in result.split("\n")[0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_allows_curl_to_public_url():
|
|
"""Commands with public URLs should not be blocked by the internal URL check."""
|
|
tool = ExecTool()
|
|
with patch("nanobot.security.network.socket.getaddrinfo", _fake_resolve_public):
|
|
guard_result = tool._guard_command("curl https://example.com/api", "/tmp")
|
|
assert guard_result is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_blocks_chained_internal_url():
|
|
"""Internal URLs buried in chained commands should still be caught."""
|
|
tool = ExecTool()
|
|
with patch("nanobot.security.network.socket.getaddrinfo", _fake_resolve_private):
|
|
result = await tool.execute(
|
|
command="echo start && curl http://169.254.169.254/latest/meta-data/ && echo done"
|
|
)
|
|
assert "Error" in result
|
|
|
|
|
|
# --- #2989: block writes to nanobot internal state files -----------------
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"command",
|
|
[
|
|
"cat foo >> history.jsonl",
|
|
"echo '{}' > history.jsonl",
|
|
"echo '{}' > memory/history.jsonl",
|
|
"echo '{}' > ./workspace/memory/history.jsonl",
|
|
"tee -a history.jsonl < foo",
|
|
"tee history.jsonl",
|
|
"cp /tmp/fake.jsonl history.jsonl",
|
|
"mv backup.jsonl memory/history.jsonl",
|
|
"dd if=/dev/zero of=memory/history.jsonl",
|
|
"sed -i 's/old/new/' history.jsonl",
|
|
"echo x > .dream_cursor",
|
|
"cp /tmp/x memory/.dream_cursor",
|
|
],
|
|
)
|
|
def test_exec_blocks_writes_to_history_jsonl(command):
|
|
"""Direct writes to history.jsonl / .dream_cursor must be blocked (#2989)."""
|
|
tool = ExecTool()
|
|
result = tool._guard_command(command, "/tmp")
|
|
assert result is not None
|
|
assert "deny pattern filter" in result.lower()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"command",
|
|
[
|
|
"cat history.jsonl",
|
|
"wc -l history.jsonl",
|
|
"tail -n 5 history.jsonl",
|
|
"grep foo history.jsonl",
|
|
"cp history.jsonl /tmp/history.backup",
|
|
"ls memory/",
|
|
"echo history.jsonl",
|
|
],
|
|
)
|
|
def test_exec_allows_reads_of_history_jsonl(command):
|
|
"""Read-only access to history.jsonl must still be allowed."""
|
|
tool = ExecTool()
|
|
result = tool._guard_command(command, "/tmp")
|
|
assert result is None
|
|
|
|
|
|
# --- #2826: working_dir must not escape the configured workspace ---------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_blocks_working_dir_outside_workspace(tmp_path):
|
|
"""An LLM-supplied working_dir outside the workspace must be rejected."""
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
tool = ExecTool(working_dir=str(workspace), restrict_to_workspace=True)
|
|
result = await tool.execute(command="rm calendar.ics", working_dir="/etc")
|
|
assert "outside the configured workspace" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_blocks_absolute_rm_via_hijacked_working_dir(tmp_path):
|
|
"""Regression for #2826: `rm /abs/path` via working_dir hijack."""
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
victim_dir = tmp_path / "outside"
|
|
victim_dir.mkdir()
|
|
victim = victim_dir / "file.ics"
|
|
victim.write_text("data")
|
|
|
|
tool = ExecTool(working_dir=str(workspace), restrict_to_workspace=True)
|
|
result = await tool.execute(
|
|
command=f"rm {victim}",
|
|
working_dir=str(victim_dir),
|
|
)
|
|
assert "outside the configured workspace" in result
|
|
assert victim.exists(), "victim file must not have been deleted"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_allows_working_dir_within_workspace(tmp_path):
|
|
"""A working_dir that is a subdirectory of the workspace is fine."""
|
|
workspace = tmp_path / "workspace"
|
|
subdir = workspace / "project"
|
|
subdir.mkdir(parents=True)
|
|
tool = ExecTool(working_dir=str(workspace), restrict_to_workspace=True, timeout=5)
|
|
result = await tool.execute(command="echo ok", working_dir=str(subdir))
|
|
assert "ok" in result
|
|
assert "outside the configured workspace" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_allows_working_dir_equal_to_workspace(tmp_path):
|
|
"""Passing working_dir equal to the workspace root must be allowed."""
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
tool = ExecTool(working_dir=str(workspace), restrict_to_workspace=True, timeout=5)
|
|
result = await tool.execute(command="echo ok", working_dir=str(workspace))
|
|
assert "ok" in result
|
|
assert "outside the configured workspace" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_ignores_workspace_check_when_not_restricted(tmp_path):
|
|
"""Without restrict_to_workspace, the LLM may still choose any working_dir."""
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
other = tmp_path / "other"
|
|
other.mkdir()
|
|
tool = ExecTool(working_dir=str(workspace), restrict_to_workspace=False, timeout=5)
|
|
result = await tool.execute(command="echo ok", working_dir=str(other))
|
|
assert "ok" in result
|
|
assert "outside the configured workspace" not in result
|
|
|
|
|
|
# --- #3599: stdio redirects to /dev/null must not trip the workspace guard ----
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"command",
|
|
[
|
|
# The exact command from the #3599 reporter.
|
|
'rm test_print.txt 2>/dev/null; echo "done"',
|
|
# Plain redirect of stdout / stderr.
|
|
"find . -type f >/dev/null",
|
|
"noisy_cmd 2>/dev/null",
|
|
"noisy_cmd >/dev/null 2>&1",
|
|
# Read from /dev/urandom is also a benign device read.
|
|
"head -c 16 /dev/urandom | xxd",
|
|
"echo done >/dev/stderr",
|
|
"echo line </dev/stdin",
|
|
# Per-process FD aliases never escape the workspace.
|
|
"cat /dev/fd/3",
|
|
],
|
|
)
|
|
def test_exec_allows_benign_device_targets_inside_workspace(tmp_path, command):
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
tool = ExecTool(working_dir=str(workspace), restrict_to_workspace=True)
|
|
assert tool._guard_command(command, str(workspace)) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_3599_regression_rm_with_dev_null_redirect(tmp_path):
|
|
"""#3599: ``rm <ws-path> 2>/dev/null`` must succeed against the workspace guard."""
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
target = workspace / "test_print.txt"
|
|
target.write_text("scratch")
|
|
tool = ExecTool(working_dir=str(workspace), restrict_to_workspace=True, timeout=5)
|
|
result = await tool.execute(
|
|
command=f'rm {target} 2>/dev/null; echo "done"',
|
|
working_dir=str(workspace),
|
|
)
|
|
assert "done" in result
|
|
assert "path outside working dir" not in result
|
|
assert not target.exists()
|
|
|
|
|
|
def test_exec_still_blocks_real_outside_path_via_redirect(tmp_path):
|
|
"""Redirect *targets* outside the workspace (not /dev/...) must still be blocked.
|
|
|
|
We only whitelist kernel device files; arbitrary outside redirects such as
|
|
``> /etc/issue`` should remain caught by the workspace guard so a buggy
|
|
LLM cannot exfiltrate data outside the workspace via stderr redirection.
|
|
"""
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
tool = ExecTool(working_dir=str(workspace), restrict_to_workspace=True)
|
|
blocked = tool._guard_command("echo pwn > /etc/issue", str(workspace))
|
|
assert blocked is not None
|
|
assert "path outside working dir" in blocked
|