mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-19 16:12:30 +00:00
fix(message): share workspace path resolver
This commit is contained in:
parent
57d7847dc8
commit
164614ccf2
@ -9,51 +9,15 @@ from typing import Any
|
||||
|
||||
from nanobot.agent.tools.base import Tool, tool_parameters
|
||||
from nanobot.agent.tools.file_state import FileStates, _hash_file, current_file_states
|
||||
from nanobot.agent.tools.path_utils import resolve_workspace_path
|
||||
from nanobot.agent.tools.schema import (
|
||||
BooleanSchema,
|
||||
IntegerSchema,
|
||||
StringSchema,
|
||||
tool_parameters_schema,
|
||||
)
|
||||
from nanobot.config.paths import get_media_dir
|
||||
from nanobot.utils.helpers import build_image_content_blocks, detect_image_mime
|
||||
|
||||
_FS_WORKSPACE_BOUNDARY_NOTE = (
|
||||
" (this is a hard policy boundary, not a transient failure; "
|
||||
"do not retry with shell tricks or alternative tools, and ask "
|
||||
"the user how to proceed if the resource is genuinely required)"
|
||||
)
|
||||
|
||||
|
||||
def _resolve_path(
|
||||
path: str,
|
||||
workspace: Path | None = None,
|
||||
allowed_dir: Path | None = None,
|
||||
extra_allowed_dirs: list[Path] | None = None,
|
||||
) -> Path:
|
||||
"""Resolve path against workspace (if relative) and enforce directory restriction."""
|
||||
p = Path(path).expanduser()
|
||||
if not p.is_absolute() and workspace:
|
||||
p = workspace / p
|
||||
resolved = p.resolve()
|
||||
if allowed_dir:
|
||||
media_path = get_media_dir().resolve()
|
||||
all_dirs = [allowed_dir] + [media_path] + (extra_allowed_dirs or [])
|
||||
if not any(_is_under(resolved, d) for d in all_dirs):
|
||||
raise PermissionError(
|
||||
f"Path {path} is outside allowed directory {allowed_dir}"
|
||||
+ _FS_WORKSPACE_BOUNDARY_NOTE
|
||||
)
|
||||
return resolved
|
||||
|
||||
|
||||
def _is_under(path: Path, directory: Path) -> bool:
|
||||
try:
|
||||
path.relative_to(directory.resolve())
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
class _FsTool(Tool):
|
||||
"""Shared base for filesystem tools — common init and path resolution."""
|
||||
@ -98,7 +62,12 @@ class _FsTool(Tool):
|
||||
return current_file_states(self._fallback_file_states)
|
||||
|
||||
def _resolve(self, path: str) -> Path:
|
||||
return _resolve_path(path, self._workspace, self._allowed_dir, self._extra_allowed_dirs)
|
||||
return resolve_workspace_path(
|
||||
path,
|
||||
self._workspace,
|
||||
self._allowed_dir,
|
||||
self._extra_allowed_dirs,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@ -6,7 +6,7 @@ from typing import Any, Awaitable, Callable
|
||||
|
||||
from nanobot.agent.tools.base import Tool, tool_parameters
|
||||
from nanobot.agent.tools.context import ContextAware, RequestContext
|
||||
from nanobot.agent.tools.filesystem import _resolve_path
|
||||
from nanobot.agent.tools.path_utils import resolve_workspace_path
|
||||
from nanobot.agent.tools.schema import ArraySchema, StringSchema, tool_parameters_schema
|
||||
from nanobot.bus.events import OutboundMessage
|
||||
from nanobot.config.paths import get_workspace_path
|
||||
@ -146,7 +146,7 @@ class MessageTool(Tool, ContextAware):
|
||||
path = Path(p).expanduser()
|
||||
resolved.append(p if path.is_absolute() else str(self._workspace / path))
|
||||
else:
|
||||
resolved.append(str(_resolve_path(p, self._workspace, allowed_dir)))
|
||||
resolved.append(str(resolve_workspace_path(p, self._workspace, allowed_dir)))
|
||||
return resolved
|
||||
|
||||
async def execute(
|
||||
|
||||
42
nanobot/agent/tools/path_utils.py
Normal file
42
nanobot/agent/tools/path_utils.py
Normal file
@ -0,0 +1,42 @@
|
||||
"""Shared path helpers for workspace-scoped tools."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from nanobot.config.paths import get_media_dir
|
||||
|
||||
WORKSPACE_BOUNDARY_NOTE = (
|
||||
" (this is a hard policy boundary, not a transient failure; "
|
||||
"do not retry with shell tricks or alternative tools, and ask "
|
||||
"the user how to proceed if the resource is genuinely required)"
|
||||
)
|
||||
|
||||
|
||||
def is_under(path: Path, directory: Path) -> bool:
|
||||
"""Return True when path resolves under directory."""
|
||||
try:
|
||||
path.relative_to(directory.resolve())
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def resolve_workspace_path(
|
||||
path: str,
|
||||
workspace: Path | None = None,
|
||||
allowed_dir: Path | None = None,
|
||||
extra_allowed_dirs: list[Path] | None = None,
|
||||
) -> Path:
|
||||
"""Resolve path against workspace and enforce allowed directory containment."""
|
||||
p = Path(path).expanduser()
|
||||
if not p.is_absolute() and workspace:
|
||||
p = workspace / p
|
||||
resolved = p.resolve()
|
||||
if allowed_dir:
|
||||
media_path = get_media_dir().resolve()
|
||||
all_dirs = [allowed_dir, media_path, *(extra_allowed_dirs or [])]
|
||||
if not any(is_under(resolved, d) for d in all_dirs):
|
||||
raise PermissionError(
|
||||
f"Path {path} is outside allowed directory {allowed_dir}"
|
||||
+ WORKSPACE_BOUNDARY_NOTE
|
||||
)
|
||||
return resolved
|
||||
@ -9,7 +9,6 @@ from nanobot.agent.tools.filesystem import (
|
||||
_find_match,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ReadFileTool
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -330,7 +329,7 @@ class TestWorkspaceRestriction:
|
||||
media_file = media_dir / "photo.txt"
|
||||
media_file.write_text("shared media", encoding="utf-8")
|
||||
|
||||
monkeypatch.setattr("nanobot.agent.tools.filesystem.get_media_dir", lambda: media_dir)
|
||||
monkeypatch.setattr("nanobot.agent.tools.path_utils.get_media_dir", lambda: media_dir)
|
||||
|
||||
tool = ReadFileTool(workspace=workspace, allowed_dir=workspace)
|
||||
result = await tool.execute(path=str(media_file))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user