fix(agent): tighten safety guard edge cases

Keep the /dev workspace guard exception scoped to the known benign device paths already handled by ExecTool, and add coverage that non-benign /dev targets still get blocked. Also add a streaming regression for tool_error responses so fatal tool failures are delivered by channels instead of being marked as already streamed.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Xubin Ren 2026-05-03 17:23:23 +00:00 committed by Xubin Ren
parent d3689d143c
commit 614b21368f
3 changed files with 46 additions and 3 deletions

View File

@ -349,14 +349,11 @@ class ExecTool(Tool):
continue
media_path = get_media_dir().resolve()
dev_path = Path("/dev").resolve()
if (p.is_absolute()
and cwd_path not in p.parents
and p != cwd_path
and media_path not in p.parents
and p != media_path
and dev_path not in p.parents
and p != dev_path
):
return (
"Error: Command blocked by safety guard (path outside working dir)"

View File

@ -1289,6 +1289,45 @@ async def test_streamed_flag_not_set_on_llm_error(tmp_path):
"_streamed must not be set when stop_reason is error"
@pytest.mark.asyncio
async def test_streamed_flag_not_set_on_tool_error(tmp_path):
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
tool_call_resp = LLMResponse(
content="checking metadata",
tool_calls=[ToolCallRequest(
id="call_ssrf",
name="exec",
arguments={"command": "curl http://169.254.169.254/latest/meta-data/"},
)],
usage={},
)
provider.chat_stream_with_retry = AsyncMock(return_value=tool_call_resp)
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
loop.tools.get_definitions = MagicMock(return_value=[])
loop.tools.prepare_call = MagicMock(return_value=(None, {}, None))
loop.tools.execute = AsyncMock(return_value=(
"Error: Command blocked by safety guard (internal/private URL detected)"
))
result = await loop._process_message(
InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="hi"),
on_stream=AsyncMock(),
on_stream_end=AsyncMock(),
)
assert result is not None
assert "internal/private URL detected" in result.content
assert not result.metadata.get("_streamed"), \
"_streamed must not be set when stop_reason is tool_error"
@pytest.mark.asyncio
async def test_next_turn_after_llm_error_keeps_turn_boundary(tmp_path):
from nanobot.agent.loop import AgentLoop

View File

@ -330,6 +330,13 @@ def test_exec_guard_allows_dev_urandom(tmp_path) -> None:
assert error is None
def test_exec_guard_blocks_non_benign_dev_path(tmp_path) -> None:
tool = ExecTool(restrict_to_workspace=True)
error = tool._guard_command("cat /dev/sda", str(tmp_path))
assert error is not None
assert "path outside working dir" in error
def test_exec_extract_absolute_paths_ignores_pipe_tilde() -> None:
cmd = "python query.py --query '{job=\"app\"} |~ \"error\"'"
paths = ExecTool._extract_absolute_paths(cmd)