mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-04 00:35:58 +00:00
431 lines
16 KiB
Python
431 lines
16 KiB
Python
"""Tests for ReadFileTool enhancements: description fix, read dedup, PDF support, device blacklist, office docs."""
|
|
|
|
import os
|
|
import sys
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool
|
|
from nanobot.agent.tools import file_state
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _clear_file_state():
|
|
file_state.clear()
|
|
yield
|
|
file_state.clear()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Description fix
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestReadDescriptionFix:
|
|
|
|
def test_description_mentions_image_support(self):
|
|
tool = ReadFileTool()
|
|
assert "image" in tool.description.lower()
|
|
|
|
def test_description_no_longer_says_cannot_read_images(self):
|
|
tool = ReadFileTool()
|
|
assert "cannot read binary files or images" not in tool.description.lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Read deduplication
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestReadDedup:
|
|
"""Same file + same offset/limit + unchanged mtime -> short stub."""
|
|
|
|
@pytest.fixture()
|
|
def tool(self, tmp_path):
|
|
return ReadFileTool(workspace=tmp_path)
|
|
|
|
@pytest.fixture()
|
|
def write_tool(self, tmp_path):
|
|
return WriteFileTool(workspace=tmp_path)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_second_read_returns_unchanged_stub(self, tool, tmp_path):
|
|
f = tmp_path / "data.txt"
|
|
f.write_text("\n".join(f"line {i}" for i in range(100)), encoding="utf-8")
|
|
first = await tool.execute(path=str(f))
|
|
assert "line 0" in first
|
|
second = await tool.execute(path=str(f))
|
|
assert "unchanged" in second.lower()
|
|
# Stub should not contain file content
|
|
assert "line 0" not in second
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_after_external_modification_returns_full(self, tool, tmp_path):
|
|
f = tmp_path / "data.txt"
|
|
f.write_text("original", encoding="utf-8")
|
|
await tool.execute(path=str(f))
|
|
# Modify the file externally
|
|
f.write_text("modified content", encoding="utf-8")
|
|
second = await tool.execute(path=str(f))
|
|
assert "modified content" in second
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_different_offset_returns_full(self, tool, tmp_path):
|
|
f = tmp_path / "data.txt"
|
|
f.write_text("\n".join(f"line {i}" for i in range(1, 21)), encoding="utf-8")
|
|
await tool.execute(path=str(f), offset=1, limit=5)
|
|
second = await tool.execute(path=str(f), offset=6, limit=5)
|
|
# Different offset → full read, not stub
|
|
assert "line 6" in second
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_first_read_after_write_returns_full_content(self, tool, write_tool, tmp_path):
|
|
f = tmp_path / "fresh.txt"
|
|
result = await write_tool.execute(path=str(f), content="hello")
|
|
assert "Successfully" in result
|
|
read_result = await tool.execute(path=str(f))
|
|
assert "hello" in read_result
|
|
assert "unchanged" not in read_result.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dedup_does_not_apply_to_images(self, tool, tmp_path):
|
|
f = tmp_path / "img.png"
|
|
f.write_bytes(b"\x89PNG\r\n\x1a\nfake-png-data")
|
|
first = await tool.execute(path=str(f))
|
|
assert isinstance(first, list)
|
|
second = await tool.execute(path=str(f))
|
|
# Images should always return full content blocks, not a stub
|
|
assert isinstance(second, list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cross-session isolation (issue #3571)
|
|
# ---------------------------------------------------------------------------
|
|
# Each session must keep its own read cache. When session A reads a file,
|
|
# session B reading the same file must still receive the full content, not
|
|
# the "[File unchanged since last read]" dedup stub. The stub is only valid
|
|
# within the session that first cached the read.
|
|
|
|
class TestReadDedupSessionIsolation:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_separate_sessions_do_not_share_dedup_state(self, tmp_path):
|
|
f = tmp_path / "shared.txt"
|
|
f.write_text("\n".join(f"line {i}" for i in range(10)), encoding="utf-8")
|
|
|
|
session_a_tool = ReadFileTool(workspace=tmp_path)
|
|
session_b_tool = ReadFileTool(workspace=tmp_path)
|
|
|
|
first = await session_a_tool.execute(path=str(f))
|
|
assert "line 0" in first
|
|
|
|
# Session B has never read this file before — it must see the full
|
|
# content, not the dedup stub from session A.
|
|
second = await session_b_tool.execute(path=str(f))
|
|
assert "unchanged" not in second.lower(), (
|
|
"Session B should not inherit session A's read-dedup state. "
|
|
f"Got: {second!r}"
|
|
)
|
|
assert "line 0" in second
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shared_loop_tool_uses_bound_session_state(self, tmp_path):
|
|
f = tmp_path / "shared.txt"
|
|
f.write_text("\n".join(f"line {i}" for i in range(10)), encoding="utf-8")
|
|
|
|
# AgentLoop registers one shared ReadFileTool instance. The session
|
|
# boundary is the task-local FileStates binding, not the tool object.
|
|
shared_tool = ReadFileTool(workspace=tmp_path)
|
|
session_a = file_state.FileStates()
|
|
session_b = file_state.FileStates()
|
|
|
|
token = file_state.bind_file_states(session_a)
|
|
try:
|
|
first = await shared_tool.execute(path=str(f))
|
|
repeat = await shared_tool.execute(path=str(f))
|
|
finally:
|
|
file_state.reset_file_states(token)
|
|
|
|
assert "line 0" in first
|
|
assert "unchanged" in repeat.lower()
|
|
|
|
token = file_state.bind_file_states(session_b)
|
|
try:
|
|
second_session_read = await shared_tool.execute(path=str(f))
|
|
finally:
|
|
file_state.reset_file_states(token)
|
|
|
|
assert "unchanged" not in second_session_read.lower()
|
|
assert "line 0" in second_session_read
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PDF support
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestReadPdf:
|
|
|
|
@pytest.fixture()
|
|
def tool(self, tmp_path):
|
|
return ReadFileTool(workspace=tmp_path)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pdf_returns_text_content(self, tool, tmp_path):
|
|
fitz = pytest.importorskip("fitz")
|
|
pdf_path = tmp_path / "test.pdf"
|
|
doc = fitz.open()
|
|
page = doc.new_page()
|
|
page.insert_text((72, 72), "Hello PDF World")
|
|
doc.save(str(pdf_path))
|
|
doc.close()
|
|
|
|
result = await tool.execute(path=str(pdf_path))
|
|
assert "Hello PDF World" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pdf_pages_parameter(self, tool, tmp_path):
|
|
fitz = pytest.importorskip("fitz")
|
|
pdf_path = tmp_path / "multi.pdf"
|
|
doc = fitz.open()
|
|
for i in range(5):
|
|
page = doc.new_page()
|
|
page.insert_text((72, 72), f"Page {i + 1} content")
|
|
doc.save(str(pdf_path))
|
|
doc.close()
|
|
|
|
result = await tool.execute(path=str(pdf_path), pages="2-3")
|
|
assert "Page 2 content" in result
|
|
assert "Page 3 content" in result
|
|
assert "Page 1 content" not in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pdf_file_not_found_error(self, tool, tmp_path):
|
|
result = await tool.execute(path=str(tmp_path / "nope.pdf"))
|
|
assert "Error" in result
|
|
assert "not found" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Device path blacklist
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="/dev directory doesn't exist on Windows")
|
|
class TestReadDeviceBlacklist:
|
|
|
|
@pytest.fixture()
|
|
def tool(self):
|
|
return ReadFileTool()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dev_random_blocked(self, tool):
|
|
result = await tool.execute(path="/dev/random")
|
|
assert "Error" in result
|
|
assert "blocked" in result.lower() or "device" in result.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dev_urandom_blocked(self, tool):
|
|
result = await tool.execute(path="/dev/urandom")
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dev_zero_blocked(self, tool):
|
|
result = await tool.execute(path="/dev/zero")
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_proc_fd_blocked(self, tool):
|
|
result = await tool.execute(path="/proc/self/fd/0")
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_symlink_to_dev_zero_blocked(self, tmp_path):
|
|
tool = ReadFileTool(workspace=tmp_path)
|
|
link = tmp_path / "zero-link"
|
|
link.symlink_to("/dev/zero")
|
|
result = await tool.execute(path=str(link))
|
|
assert "Error" in result
|
|
assert "blocked" in result.lower() or "device" in result.lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# file_state: mtime-unchanged / content-changed fallback
|
|
# ---------------------------------------------------------------------------
|
|
# On filesystems with coarse mtime resolution (NTFS ~100ms, FAT 2s) a fast
|
|
# write-after-read can leave mtime unchanged. The content-hash fallback is
|
|
# what protects against stale-read warnings being false-negative on those
|
|
# platforms. Lock that behavior down here so nobody reverts it silently.
|
|
|
|
class TestFileStateHashFallback:
|
|
|
|
def test_check_read_warns_when_content_changed_but_mtime_same(self, tmp_path):
|
|
f = tmp_path / "data.txt"
|
|
f.write_text("original", encoding="utf-8")
|
|
file_state.record_read(f)
|
|
original_mtime = os.path.getmtime(f)
|
|
|
|
f.write_text("modified", encoding="utf-8")
|
|
os.utime(f, (original_mtime, original_mtime))
|
|
assert os.path.getmtime(f) == original_mtime
|
|
|
|
warning = file_state.check_read(f)
|
|
assert warning is not None
|
|
assert "modified" in warning.lower()
|
|
|
|
def test_check_read_passes_when_content_and_mtime_unchanged(self, tmp_path):
|
|
f = tmp_path / "data.txt"
|
|
f.write_text("stable", encoding="utf-8")
|
|
file_state.record_read(f)
|
|
|
|
assert file_state.check_read(f) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Line-ending normalization
|
|
# ---------------------------------------------------------------------------
|
|
# ReadFileTool normalizes CRLF -> LF before line-splitting. This primarily
|
|
# helps Windows users whose checkouts carry CRLF line endings and whose
|
|
# subsequent StrReplace edits would otherwise miss on `\r` boundaries. The
|
|
# normalization applies on all platforms; these tests lock that in so the
|
|
# behavior is intentional and discoverable, not accidental.
|
|
|
|
class TestReadFileLineEndingNormalization:
|
|
|
|
@pytest.fixture()
|
|
def tool(self, tmp_path):
|
|
return ReadFileTool(workspace=tmp_path)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_crlf_is_normalized_to_lf(self, tool, tmp_path):
|
|
f = tmp_path / "crlf.txt"
|
|
f.write_bytes(b"alpha\r\nbeta\r\ngamma\r\n")
|
|
result = await tool.execute(path=str(f))
|
|
assert "\r" not in result
|
|
assert "alpha" in result and "beta" in result and "gamma" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lf_only_is_preserved(self, tool, tmp_path):
|
|
f = tmp_path / "lf.txt"
|
|
f.write_bytes(b"alpha\nbeta\ngamma\n")
|
|
result = await tool.execute(path=str(f))
|
|
assert "\r" not in result
|
|
assert "alpha" in result and "beta" in result and "gamma" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Office document support (DOCX, XLSX, PPTX)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestReadOfficeDocuments:
|
|
|
|
@pytest.fixture()
|
|
def tool(self, tmp_path):
|
|
return ReadFileTool(workspace=tmp_path)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_docx_returns_extracted_text(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value="Title\n\nParagraph 1"):
|
|
f = tmp_path / "test.docx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert "Title" in result
|
|
assert "Paragraph 1" in result
|
|
assert "Error" not in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_xlsx_returns_extracted_text(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value="--- Sheet: Sheet1 ---\nName\tAge\nAlice\t30"):
|
|
f = tmp_path / "test.xlsx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert "Sheet1" in result
|
|
assert "Alice" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pptx_returns_extracted_text(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value="--- Slide 1 ---\nWelcome\n--- Slide 2 ---\nContent"):
|
|
f = tmp_path / "test.pptx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert "Welcome" in result
|
|
assert "Content" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_docx_missing_library(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value="[error: python-docx not installed]"):
|
|
f = tmp_path / "test.docx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert "Error" in result
|
|
assert "python-docx not installed" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_docx_corrupt_file(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value="[error: failed to extract DOCX: bad zip]"):
|
|
f = tmp_path / "test.docx"
|
|
f.write_bytes(b"not-a-zip")
|
|
result = await tool.execute(path=str(f))
|
|
assert "Error" in result
|
|
assert "failed to extract DOCX" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unsupported_extension(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value=None):
|
|
f = tmp_path / "test.docx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert "Error" in result
|
|
assert "Unsupported" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_document_returns_descriptive_message(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value=""):
|
|
f = tmp_path / "empty.docx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert "no extractable text" in result
|
|
|
|
|
|
class TestOfficeDocTruncation:
|
|
|
|
@pytest.fixture()
|
|
def tool(self, tmp_path):
|
|
return ReadFileTool(workspace=tmp_path)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_large_document_truncated(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value="x" * 200_000):
|
|
f = tmp_path / "large.docx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert len(result) <= ReadFileTool._MAX_CHARS + 100
|
|
assert "truncated at ~128K chars" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_small_document_not_truncated(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value="Hello world"):
|
|
f = tmp_path / "small.docx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert "truncated" not in result
|
|
assert "Hello world" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_response_not_truncated(self, tool, tmp_path):
|
|
with patch("nanobot.utils.document.extract_text", return_value="[error: failed to extract DOCX: something went wrong]"):
|
|
f = tmp_path / "bad.docx"
|
|
f.write_bytes(b"PK")
|
|
result = await tool.execute(path=str(f))
|
|
assert "Error" in result
|
|
assert "truncated" not in result
|
|
|
|
|
|
class TestReadDescriptionUpdate:
|
|
|
|
def test_description_mentions_documents(self):
|
|
tool = ReadFileTool()
|
|
desc = tool.description.lower()
|
|
assert "document" in desc or "docx" in desc or "xlsx" in desc or "pptx" in desc
|
|
|
|
def test_description_no_longer_says_cannot_read(self):
|
|
tool = ReadFileTool()
|
|
assert "cannot read" not in tool.description.lower()
|