mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-06 09:45:51 +00:00
fix(agent): prevent safety guard false positives and streamed message drop
Three independent fixes for issues exposed by PR #3493: 1. shell.py: allow /dev/* paths in workspace guard Commands like `rm file.txt 2>/dev/null` were blocked because _extract_absolute_paths captured /dev/null as a path outside the workspace. Allow /dev like media_path is already allowed. 2. shell.py: remove | from home_paths regex prefix Loki query operator `|~` was misinterpreted as pipe + home directory, causing false workspace violation errors. 3. loop.py: change _streamed from blacklist to whitelist stop_reason "tool_error" was not in the exclusion set {"ask_user", "error"}, so _streamed=True was set on fatal errors. channel manager then skipped channel.send() because it assumed the content was already streamed — but it never was. Whitelist to only {"stop", "end_turn", "max_tokens"}. Also fixes a pre-existing Windows bug in _spawn where create_subprocess_exec + list2cmdline breaks commands with paths containing spaces (e.g. D:\Program Files\python.exe). Closes: #3599, #3605
This commit is contained in:
parent
2a7433b7ec
commit
d3689d143c
@ -1135,7 +1135,7 @@ class AgentLoop:
|
||||
ask_user_options_from_messages(all_msgs) if stop_reason == "ask_user" else [],
|
||||
msg.channel,
|
||||
)
|
||||
if on_stream is not None and stop_reason not in {"ask_user", "error"}:
|
||||
if on_stream is not None and stop_reason not in {"ask_user", "error", "tool_error"}:
|
||||
meta["_streamed"] = True
|
||||
return OutboundMessage(
|
||||
channel=msg.channel,
|
||||
|
||||
@ -220,9 +220,12 @@ class ExecTool(Tool):
|
||||
) -> asyncio.subprocess.Process:
|
||||
"""Launch *command* in a platform-appropriate shell."""
|
||||
if _IS_WINDOWS:
|
||||
comspec = env.get("COMSPEC", os.environ.get("COMSPEC", "cmd.exe"))
|
||||
return await asyncio.create_subprocess_exec(
|
||||
comspec, "/c", command,
|
||||
# create_subprocess_exec re-quotes args via list2cmdline, which
|
||||
# breaks commands containing paths with spaces (e.g. "D:\Program
|
||||
# Files\python.exe" "script.py"). create_subprocess_shell passes
|
||||
# the raw command string to COMSPEC without re-quoting.
|
||||
return await asyncio.create_subprocess_shell(
|
||||
command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=cwd,
|
||||
@ -346,11 +349,14 @@ class ExecTool(Tool):
|
||||
continue
|
||||
|
||||
media_path = get_media_dir().resolve()
|
||||
dev_path = Path("/dev").resolve()
|
||||
if (p.is_absolute()
|
||||
and cwd_path not in p.parents
|
||||
and p != cwd_path
|
||||
and media_path not in p.parents
|
||||
and p != media_path
|
||||
and dev_path not in p.parents
|
||||
and p != dev_path
|
||||
):
|
||||
return (
|
||||
"Error: Command blocked by safety guard (path outside working dir)"
|
||||
@ -372,5 +378,5 @@ class ExecTool(Tool):
|
||||
# NOTE: `*` is required so `C:\` (nothing after the slash) is still extracted.
|
||||
win_paths = re.findall(r"[A-Za-z]:\\[^\s\"'|><;]*", command)
|
||||
posix_paths = re.findall(r"(?:^|[\s|>'\"])(/[^\s\"'>;|<]+)", command) # POSIX: /absolute only
|
||||
home_paths = re.findall(r"(?:^|[\s|>'\"])(~[^\s\"'>;|<]*)", command) # POSIX/Windows home shortcut: ~
|
||||
home_paths = re.findall(r"(?:^|[\s>'\"])(~[^\s\"'>;|<]*)", command) # POSIX/Windows home shortcut: ~
|
||||
return win_paths + posix_paths + home_paths
|
||||
|
||||
@ -112,33 +112,31 @@ class TestSpawnUnix:
|
||||
class TestSpawnWindows:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_uses_comspec_from_env(self):
|
||||
async def test_uses_create_subprocess_shell(self):
|
||||
env = {"COMSPEC": r"C:\Windows\system32\cmd.exe", "PATH": ""}
|
||||
with (
|
||||
patch("nanobot.agent.tools.shell._IS_WINDOWS", True),
|
||||
patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_exec,
|
||||
patch("asyncio.create_subprocess_shell", new_callable=AsyncMock) as mock_shell,
|
||||
):
|
||||
mock_exec.return_value = AsyncMock()
|
||||
await ExecTool._spawn("dir", r"C:\Users", env)
|
||||
mock_shell.return_value = AsyncMock()
|
||||
await ExecTool._spawn("dir", r"C:\work", env)
|
||||
|
||||
args = mock_exec.call_args[0]
|
||||
assert "cmd.exe" in args[0]
|
||||
assert "/c" in args
|
||||
args = mock_shell.call_args[0]
|
||||
assert "dir" in args
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_falls_back_to_default_comspec(self):
|
||||
env = {"PATH": ""}
|
||||
async def test_passes_cwd_and_env(self):
|
||||
env = {"PATH": "/usr/bin"}
|
||||
with (
|
||||
patch("nanobot.agent.tools.shell._IS_WINDOWS", True),
|
||||
patch.dict("os.environ", {}, clear=True),
|
||||
patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_exec,
|
||||
patch("asyncio.create_subprocess_shell", new_callable=AsyncMock) as mock_shell,
|
||||
):
|
||||
mock_exec.return_value = AsyncMock()
|
||||
await ExecTool._spawn("dir", r"C:\Users", env)
|
||||
mock_shell.return_value = AsyncMock()
|
||||
await ExecTool._spawn("echo hi", r"C:\work", env)
|
||||
|
||||
args = mock_exec.call_args[0]
|
||||
assert args[0] == "cmd.exe"
|
||||
kwargs = mock_shell.call_args[1]
|
||||
assert kwargs["cwd"] == r"C:\work"
|
||||
assert kwargs["env"] == env
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@ -315,6 +315,27 @@ def test_exec_guard_blocks_windows_drive_root_outside_workspace(monkeypatch) ->
|
||||
assert "hard policy boundary" in error
|
||||
|
||||
|
||||
def test_exec_guard_allows_dev_null_redirect(tmp_path) -> None:
|
||||
tool = ExecTool(restrict_to_workspace=True)
|
||||
ws = tmp_path / "workspace"
|
||||
ws.mkdir()
|
||||
(ws / "file.txt").write_text("ok", encoding="utf-8")
|
||||
error = tool._guard_command(f'rm "{ws / "file.txt"}" 2>/dev/null', str(ws))
|
||||
assert error is None
|
||||
|
||||
|
||||
def test_exec_guard_allows_dev_urandom(tmp_path) -> None:
|
||||
tool = ExecTool(restrict_to_workspace=True)
|
||||
error = tool._guard_command("cat /dev/urandom | head -c 16 > random.bin", str(tmp_path))
|
||||
assert error is None
|
||||
|
||||
|
||||
def test_exec_extract_absolute_paths_ignores_pipe_tilde() -> None:
|
||||
cmd = "python query.py --query '{job=\"app\"} |~ \"error\"'"
|
||||
paths = ExecTool._extract_absolute_paths(cmd)
|
||||
assert not any(p.startswith("~") for p in paths)
|
||||
|
||||
|
||||
# --- cast_params tests ---
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user