nanobot/tests/tools/test_read_enhancements.py
aiguozhi123456 53ba410e49 feat(read_file): add DOCX, XLSX, PPTX support via document.extract_text()
Wire up the existing office document extractors in document.py to
ReadFileTool by adding an extension guard and _read_office_doc() method
that follows the established PDF pattern. Handles missing libraries,
corrupt files, empty documents, and 128K truncation consistently.
2026-04-21 22:12:19 +08:00

370 lines
14 KiB
Python

"""Tests for ReadFileTool enhancements: description fix, read dedup, PDF support, device blacklist, office docs."""
import os
import sys
from unittest.mock import patch
import pytest
from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool
from nanobot.agent.tools import file_state
@pytest.fixture(autouse=True)
def _clear_file_state():
file_state.clear()
yield
file_state.clear()
# ---------------------------------------------------------------------------
# Description fix
# ---------------------------------------------------------------------------
class TestReadDescriptionFix:
def test_description_mentions_image_support(self):
tool = ReadFileTool()
assert "image" in tool.description.lower()
def test_description_no_longer_says_cannot_read_images(self):
tool = ReadFileTool()
assert "cannot read binary files or images" not in tool.description.lower()
# ---------------------------------------------------------------------------
# Read deduplication
# ---------------------------------------------------------------------------
class TestReadDedup:
"""Same file + same offset/limit + unchanged mtime -> short stub."""
@pytest.fixture()
def tool(self, tmp_path):
return ReadFileTool(workspace=tmp_path)
@pytest.fixture()
def write_tool(self, tmp_path):
return WriteFileTool(workspace=tmp_path)
@pytest.mark.asyncio
async def test_second_read_returns_unchanged_stub(self, tool, tmp_path):
f = tmp_path / "data.txt"
f.write_text("\n".join(f"line {i}" for i in range(100)), encoding="utf-8")
first = await tool.execute(path=str(f))
assert "line 0" in first
second = await tool.execute(path=str(f))
assert "unchanged" in second.lower()
# Stub should not contain file content
assert "line 0" not in second
@pytest.mark.asyncio
async def test_read_after_external_modification_returns_full(self, tool, tmp_path):
f = tmp_path / "data.txt"
f.write_text("original", encoding="utf-8")
await tool.execute(path=str(f))
# Modify the file externally
f.write_text("modified content", encoding="utf-8")
second = await tool.execute(path=str(f))
assert "modified content" in second
@pytest.mark.asyncio
async def test_different_offset_returns_full(self, tool, tmp_path):
f = tmp_path / "data.txt"
f.write_text("\n".join(f"line {i}" for i in range(1, 21)), encoding="utf-8")
await tool.execute(path=str(f), offset=1, limit=5)
second = await tool.execute(path=str(f), offset=6, limit=5)
# Different offset → full read, not stub
assert "line 6" in second
@pytest.mark.asyncio
async def test_first_read_after_write_returns_full_content(self, tool, write_tool, tmp_path):
f = tmp_path / "fresh.txt"
result = await write_tool.execute(path=str(f), content="hello")
assert "Successfully" in result
read_result = await tool.execute(path=str(f))
assert "hello" in read_result
assert "unchanged" not in read_result.lower()
@pytest.mark.asyncio
async def test_dedup_does_not_apply_to_images(self, tool, tmp_path):
f = tmp_path / "img.png"
f.write_bytes(b"\x89PNG\r\n\x1a\nfake-png-data")
first = await tool.execute(path=str(f))
assert isinstance(first, list)
second = await tool.execute(path=str(f))
# Images should always return full content blocks, not a stub
assert isinstance(second, list)
# ---------------------------------------------------------------------------
# PDF support
# ---------------------------------------------------------------------------
class TestReadPdf:
@pytest.fixture()
def tool(self, tmp_path):
return ReadFileTool(workspace=tmp_path)
@pytest.mark.asyncio
async def test_pdf_returns_text_content(self, tool, tmp_path):
fitz = pytest.importorskip("fitz")
pdf_path = tmp_path / "test.pdf"
doc = fitz.open()
page = doc.new_page()
page.insert_text((72, 72), "Hello PDF World")
doc.save(str(pdf_path))
doc.close()
result = await tool.execute(path=str(pdf_path))
assert "Hello PDF World" in result
@pytest.mark.asyncio
async def test_pdf_pages_parameter(self, tool, tmp_path):
fitz = pytest.importorskip("fitz")
pdf_path = tmp_path / "multi.pdf"
doc = fitz.open()
for i in range(5):
page = doc.new_page()
page.insert_text((72, 72), f"Page {i + 1} content")
doc.save(str(pdf_path))
doc.close()
result = await tool.execute(path=str(pdf_path), pages="2-3")
assert "Page 2 content" in result
assert "Page 3 content" in result
assert "Page 1 content" not in result
@pytest.mark.asyncio
async def test_pdf_file_not_found_error(self, tool, tmp_path):
result = await tool.execute(path=str(tmp_path / "nope.pdf"))
assert "Error" in result
assert "not found" in result
# ---------------------------------------------------------------------------
# Device path blacklist
# ---------------------------------------------------------------------------
@pytest.mark.skipif(sys.platform == "win32", reason="/dev directory doesn't exist on Windows")
class TestReadDeviceBlacklist:
@pytest.fixture()
def tool(self):
return ReadFileTool()
@pytest.mark.asyncio
async def test_dev_random_blocked(self, tool):
result = await tool.execute(path="/dev/random")
assert "Error" in result
assert "blocked" in result.lower() or "device" in result.lower()
@pytest.mark.asyncio
async def test_dev_urandom_blocked(self, tool):
result = await tool.execute(path="/dev/urandom")
assert "Error" in result
@pytest.mark.asyncio
async def test_dev_zero_blocked(self, tool):
result = await tool.execute(path="/dev/zero")
assert "Error" in result
@pytest.mark.asyncio
async def test_proc_fd_blocked(self, tool):
result = await tool.execute(path="/proc/self/fd/0")
assert "Error" in result
@pytest.mark.asyncio
async def test_symlink_to_dev_zero_blocked(self, tmp_path):
tool = ReadFileTool(workspace=tmp_path)
link = tmp_path / "zero-link"
link.symlink_to("/dev/zero")
result = await tool.execute(path=str(link))
assert "Error" in result
assert "blocked" in result.lower() or "device" in result.lower()
# ---------------------------------------------------------------------------
# file_state: mtime-unchanged / content-changed fallback
# ---------------------------------------------------------------------------
# On filesystems with coarse mtime resolution (NTFS ~100ms, FAT 2s) a fast
# write-after-read can leave mtime unchanged. The content-hash fallback is
# what protects against stale-read warnings being false-negative on those
# platforms. Lock that behavior down here so nobody reverts it silently.
class TestFileStateHashFallback:
def test_check_read_warns_when_content_changed_but_mtime_same(self, tmp_path):
f = tmp_path / "data.txt"
f.write_text("original", encoding="utf-8")
file_state.record_read(f)
original_mtime = os.path.getmtime(f)
f.write_text("modified", encoding="utf-8")
os.utime(f, (original_mtime, original_mtime))
assert os.path.getmtime(f) == original_mtime
warning = file_state.check_read(f)
assert warning is not None
assert "modified" in warning.lower()
def test_check_read_passes_when_content_and_mtime_unchanged(self, tmp_path):
f = tmp_path / "data.txt"
f.write_text("stable", encoding="utf-8")
file_state.record_read(f)
assert file_state.check_read(f) is None
# ---------------------------------------------------------------------------
# Line-ending normalization
# ---------------------------------------------------------------------------
# ReadFileTool normalizes CRLF -> LF before line-splitting. This primarily
# helps Windows users whose checkouts carry CRLF line endings and whose
# subsequent StrReplace edits would otherwise miss on `\r` boundaries. The
# normalization applies on all platforms; these tests lock that in so the
# behavior is intentional and discoverable, not accidental.
class TestReadFileLineEndingNormalization:
@pytest.fixture()
def tool(self, tmp_path):
return ReadFileTool(workspace=tmp_path)
@pytest.mark.asyncio
async def test_crlf_is_normalized_to_lf(self, tool, tmp_path):
f = tmp_path / "crlf.txt"
f.write_bytes(b"alpha\r\nbeta\r\ngamma\r\n")
result = await tool.execute(path=str(f))
assert "\r" not in result
assert "alpha" in result and "beta" in result and "gamma" in result
@pytest.mark.asyncio
async def test_lf_only_is_preserved(self, tool, tmp_path):
f = tmp_path / "lf.txt"
f.write_bytes(b"alpha\nbeta\ngamma\n")
result = await tool.execute(path=str(f))
assert "\r" not in result
assert "alpha" in result and "beta" in result and "gamma" in result
# ---------------------------------------------------------------------------
# Office document support (DOCX, XLSX, PPTX)
# ---------------------------------------------------------------------------
class TestReadOfficeDocuments:
@pytest.fixture()
def tool(self, tmp_path):
return ReadFileTool(workspace=tmp_path)
@pytest.mark.asyncio
async def test_docx_returns_extracted_text(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value="Title\n\nParagraph 1"):
f = tmp_path / "test.docx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert "Title" in result
assert "Paragraph 1" in result
assert "Error" not in result
@pytest.mark.asyncio
async def test_xlsx_returns_extracted_text(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value="--- Sheet: Sheet1 ---\nName\tAge\nAlice\t30"):
f = tmp_path / "test.xlsx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert "Sheet1" in result
assert "Alice" in result
@pytest.mark.asyncio
async def test_pptx_returns_extracted_text(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value="--- Slide 1 ---\nWelcome\n--- Slide 2 ---\nContent"):
f = tmp_path / "test.pptx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert "Welcome" in result
assert "Content" in result
@pytest.mark.asyncio
async def test_docx_missing_library(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value="[error: python-docx not installed]"):
f = tmp_path / "test.docx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert "Error" in result
assert "python-docx not installed" in result
@pytest.mark.asyncio
async def test_docx_corrupt_file(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value="[error: failed to extract DOCX: bad zip]"):
f = tmp_path / "test.docx"
f.write_bytes(b"not-a-zip")
result = await tool.execute(path=str(f))
assert "Error" in result
assert "failed to extract DOCX" in result
@pytest.mark.asyncio
async def test_unsupported_extension(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value=None):
f = tmp_path / "test.docx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert "Error" in result
assert "Unsupported" in result
@pytest.mark.asyncio
async def test_empty_document_returns_descriptive_message(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value=""):
f = tmp_path / "empty.docx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert "no extractable text" in result
class TestOfficeDocTruncation:
@pytest.fixture()
def tool(self, tmp_path):
return ReadFileTool(workspace=tmp_path)
@pytest.mark.asyncio
async def test_large_document_truncated(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value="x" * 200_000):
f = tmp_path / "large.docx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert len(result) <= ReadFileTool._MAX_CHARS + 100
assert "truncated at ~128K chars" in result
@pytest.mark.asyncio
async def test_small_document_not_truncated(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value="Hello world"):
f = tmp_path / "small.docx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert "truncated" not in result
assert "Hello world" in result
@pytest.mark.asyncio
async def test_error_response_not_truncated(self, tool, tmp_path):
with patch("nanobot.utils.document.extract_text", return_value="[error: failed to extract DOCX: something went wrong]"):
f = tmp_path / "bad.docx"
f.write_bytes(b"PK")
result = await tool.execute(path=str(f))
assert "Error" in result
assert "truncated" not in result
class TestReadDescriptionUpdate:
def test_description_mentions_documents(self):
tool = ReadFileTool()
desc = tool.description.lower()
assert "document" in desc or "docx" in desc or "xlsx" in desc or "pptx" in desc
def test_description_no_longer_says_cannot_read(self):
tool = ReadFileTool()
assert "cannot read" not in tool.description.lower()