mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-06 19:23:39 +00:00
326 lines
10 KiB
Python
326 lines
10 KiB
Python
"""Tests for grep/glob search tools."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.agent.subagent import SubagentManager
|
|
from nanobot.agent.tools.search import GlobTool, GrepTool
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_glob_matches_recursively_and_skips_noise_dirs(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "nested").mkdir()
|
|
(tmp_path / "node_modules").mkdir()
|
|
(tmp_path / "src" / "app.py").write_text("print('ok')\n", encoding="utf-8")
|
|
(tmp_path / "nested" / "util.py").write_text("print('ok')\n", encoding="utf-8")
|
|
(tmp_path / "node_modules" / "skip.py").write_text("print('skip')\n", encoding="utf-8")
|
|
|
|
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(pattern="*.py", path=".")
|
|
|
|
assert "src/app.py" in result
|
|
assert "nested/util.py" in result
|
|
assert "node_modules/skip.py" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_glob_can_return_directories_only(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "api").mkdir(parents=True)
|
|
(tmp_path / "src" / "api" / "handlers.py").write_text("ok\n", encoding="utf-8")
|
|
|
|
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="api",
|
|
path="src",
|
|
entry_type="dirs",
|
|
)
|
|
|
|
assert result.splitlines() == ["src/api/"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_respects_glob_filter_and_context(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "main.py").write_text(
|
|
"alpha\nbeta\nmatch_here\ngamma\n",
|
|
encoding="utf-8",
|
|
)
|
|
(tmp_path / "README.md").write_text("match_here\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="match_here",
|
|
path=".",
|
|
glob="*.py",
|
|
output_mode="content",
|
|
context_before=1,
|
|
context_after=1,
|
|
)
|
|
|
|
assert "src/main.py:3" in result
|
|
assert " 2| beta" in result
|
|
assert "> 3| match_here" in result
|
|
assert " 4| gamma" in result
|
|
assert "README.md" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_defaults_to_files_with_matches(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "main.py").write_text("match_here\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="match_here",
|
|
path="src",
|
|
)
|
|
|
|
assert result.splitlines() == ["src/main.py"]
|
|
assert "1|" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_supports_case_insensitive_search(tmp_path: Path) -> None:
|
|
(tmp_path / "memory").mkdir()
|
|
(tmp_path / "memory" / "HISTORY.md").write_text(
|
|
"[2026-04-02 10:00] OAuth token rotated\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="oauth",
|
|
path="memory/HISTORY.md",
|
|
case_insensitive=True,
|
|
output_mode="content",
|
|
)
|
|
|
|
assert "memory/HISTORY.md:1" in result
|
|
assert "OAuth token rotated" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_type_filter_limits_files(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "a.py").write_text("needle\n", encoding="utf-8")
|
|
(tmp_path / "src" / "b.md").write_text("needle\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="needle",
|
|
path="src",
|
|
type="py",
|
|
)
|
|
|
|
assert result.splitlines() == ["src/a.py"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_fixed_strings_treats_regex_chars_literally(tmp_path: Path) -> None:
|
|
(tmp_path / "memory").mkdir()
|
|
(tmp_path / "memory" / "HISTORY.md").write_text(
|
|
"[2026-04-02 10:00] OAuth token rotated\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="[2026-04-02 10:00]",
|
|
path="memory/HISTORY.md",
|
|
fixed_strings=True,
|
|
output_mode="content",
|
|
)
|
|
|
|
assert "memory/HISTORY.md:1" in result
|
|
assert "[2026-04-02 10:00] OAuth token rotated" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_files_with_matches_mode_returns_unique_paths(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
a = tmp_path / "src" / "a.py"
|
|
b = tmp_path / "src" / "b.py"
|
|
a.write_text("needle\nneedle\n", encoding="utf-8")
|
|
b.write_text("needle\n", encoding="utf-8")
|
|
os.utime(a, (1, 1))
|
|
os.utime(b, (2, 2))
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="needle",
|
|
path="src",
|
|
output_mode="files_with_matches",
|
|
)
|
|
|
|
assert result.splitlines() == ["src/b.py", "src/a.py"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_files_with_matches_supports_head_limit_and_offset(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
for name in ("a.py", "b.py", "c.py"):
|
|
(tmp_path / "src" / name).write_text("needle\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="needle",
|
|
path="src",
|
|
head_limit=1,
|
|
offset=1,
|
|
)
|
|
|
|
lines = result.splitlines()
|
|
assert lines[0] == "src/b.py"
|
|
assert "pagination: limit=1, offset=1" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_count_mode_reports_counts_per_file(tmp_path: Path) -> None:
|
|
(tmp_path / "logs").mkdir()
|
|
(tmp_path / "logs" / "one.log").write_text("warn\nok\nwarn\n", encoding="utf-8")
|
|
(tmp_path / "logs" / "two.log").write_text("warn\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="warn",
|
|
path="logs",
|
|
output_mode="count",
|
|
)
|
|
|
|
assert "logs/one.log: 2" in result
|
|
assert "logs/two.log: 1" in result
|
|
assert "total matches: 3 in 2 files" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_files_with_matches_mode_respects_max_results(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
files = []
|
|
for idx, name in enumerate(("a.py", "b.py", "c.py"), start=1):
|
|
file_path = tmp_path / "src" / name
|
|
file_path.write_text("needle\n", encoding="utf-8")
|
|
os.utime(file_path, (idx, idx))
|
|
files.append(file_path)
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="needle",
|
|
path="src",
|
|
output_mode="files_with_matches",
|
|
max_results=2,
|
|
)
|
|
|
|
assert result.splitlines()[:2] == ["src/c.py", "src/b.py"]
|
|
assert "pagination: limit=2, offset=0" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_glob_supports_head_limit_offset_and_recent_first(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
a = tmp_path / "src" / "a.py"
|
|
b = tmp_path / "src" / "b.py"
|
|
c = tmp_path / "src" / "c.py"
|
|
a.write_text("a\n", encoding="utf-8")
|
|
b.write_text("b\n", encoding="utf-8")
|
|
c.write_text("c\n", encoding="utf-8")
|
|
|
|
os.utime(a, (1, 1))
|
|
os.utime(b, (2, 2))
|
|
os.utime(c, (3, 3))
|
|
|
|
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="*.py",
|
|
path="src",
|
|
head_limit=1,
|
|
offset=1,
|
|
)
|
|
|
|
lines = result.splitlines()
|
|
assert lines[0] == "src/b.py"
|
|
assert "pagination: limit=1, offset=1" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_reports_skipped_binary_and_large_files(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
(tmp_path / "binary.bin").write_bytes(b"\x00\x01\x02")
|
|
(tmp_path / "large.txt").write_text("x" * 20, encoding="utf-8")
|
|
|
|
monkeypatch.setattr(GrepTool, "_MAX_FILE_BYTES", 10)
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(pattern="needle", path=".")
|
|
|
|
assert "No matches found" in result
|
|
assert "skipped 1 binary/unreadable files" in result
|
|
assert "skipped 1 large files" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_tools_reject_paths_outside_workspace(tmp_path: Path) -> None:
|
|
outside = tmp_path.parent / "outside-search.txt"
|
|
outside.write_text("secret\n", encoding="utf-8")
|
|
|
|
grep_tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
glob_tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
|
|
grep_result = await grep_tool.execute(pattern="secret", path=str(outside))
|
|
glob_result = await glob_tool.execute(pattern="*.txt", path=str(outside.parent))
|
|
|
|
assert grep_result.startswith("Error:")
|
|
assert glob_result.startswith("Error:")
|
|
|
|
|
|
def test_agent_loop_registers_grep_and_glob(tmp_path: Path) -> None:
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
|
|
|
|
assert "grep" in loop.tools.tool_names
|
|
assert "glob" in loop.tools.tool_names
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subagent_registers_grep_and_glob(tmp_path: Path) -> None:
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=4096,
|
|
)
|
|
captured: dict[str, list[str]] = {}
|
|
|
|
async def fake_run(spec):
|
|
captured["tool_names"] = spec.tools.tool_names
|
|
return SimpleNamespace(
|
|
stop_reason="ok",
|
|
final_content="done",
|
|
tool_events=[],
|
|
error=None,
|
|
)
|
|
|
|
mgr.runner.run = fake_run
|
|
mgr._announce_result = AsyncMock()
|
|
|
|
await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"})
|
|
|
|
assert "grep" in captured["tool_names"]
|
|
assert "glob" in captured["tool_names"]
|