mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-19 16:12:30 +00:00
GlobTool is redundant — GrepTool already supports glob-based file filtering via its `glob` parameter, making a standalone glob-only tool unnecessary. Removing it simplifies the tool surface and reduces LLM confusion between glob and grep.
308 lines
9.6 KiB
Python
308 lines
9.6 KiB
Python
"""Tests for grep search tools."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.agent.subagent import SubagentManager, SubagentStatus
|
|
from nanobot.agent.tools.search import GrepTool
|
|
from nanobot.agent.tools.web import WebSearchTool
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.config.schema import WebSearchConfig
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_search_tool_refreshes_dynamic_config_loader(monkeypatch) -> None:
|
|
tool = WebSearchTool(
|
|
config=WebSearchConfig(provider="brave"),
|
|
config_loader=lambda: WebSearchConfig(provider="duckduckgo", max_results=3),
|
|
)
|
|
|
|
async def fake_duckduckgo(self, query: str, n: int) -> str:
|
|
return f"{self.config.provider}:{query}:{n}"
|
|
|
|
monkeypatch.setattr(WebSearchTool, "_search_duckduckgo", fake_duckduckgo)
|
|
|
|
assert await tool.execute("nanobot") == "duckduckgo:nanobot:3"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_respects_glob_filter_and_context(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "main.py").write_text(
|
|
"alpha\nbeta\nmatch_here\ngamma\n",
|
|
encoding="utf-8",
|
|
)
|
|
(tmp_path / "README.md").write_text("match_here\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="match_here",
|
|
path=".",
|
|
glob="*.py",
|
|
output_mode="content",
|
|
context_before=1,
|
|
context_after=1,
|
|
)
|
|
|
|
assert "src/main.py:3" in result
|
|
assert " 2| beta" in result
|
|
assert "> 3| match_here" in result
|
|
assert " 4| gamma" in result
|
|
assert "README.md" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_defaults_to_files_with_matches(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "main.py").write_text("match_here\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="match_here",
|
|
path="src",
|
|
)
|
|
|
|
assert result.splitlines() == ["src/main.py"]
|
|
assert "1|" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_supports_case_insensitive_search(tmp_path: Path) -> None:
|
|
(tmp_path / "memory").mkdir()
|
|
(tmp_path / "memory" / "HISTORY.md").write_text(
|
|
"[2026-04-02 10:00] OAuth token rotated\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="oauth",
|
|
path="memory/HISTORY.md",
|
|
case_insensitive=True,
|
|
output_mode="content",
|
|
)
|
|
|
|
assert "memory/HISTORY.md:1" in result
|
|
assert "OAuth token rotated" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_type_filter_limits_files(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
(tmp_path / "src" / "a.py").write_text("needle\n", encoding="utf-8")
|
|
(tmp_path / "src" / "b.md").write_text("needle\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="needle",
|
|
path="src",
|
|
type="py",
|
|
)
|
|
|
|
assert result.splitlines() == ["src/a.py"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_fixed_strings_treats_regex_chars_literally(tmp_path: Path) -> None:
|
|
(tmp_path / "memory").mkdir()
|
|
(tmp_path / "memory" / "HISTORY.md").write_text(
|
|
"[2026-04-02 10:00] OAuth token rotated\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="[2026-04-02 10:00]",
|
|
path="memory/HISTORY.md",
|
|
fixed_strings=True,
|
|
output_mode="content",
|
|
)
|
|
|
|
assert "memory/HISTORY.md:1" in result
|
|
assert "[2026-04-02 10:00] OAuth token rotated" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_files_with_matches_mode_returns_unique_paths(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
a = tmp_path / "src" / "a.py"
|
|
b = tmp_path / "src" / "b.py"
|
|
a.write_text("needle\nneedle\n", encoding="utf-8")
|
|
b.write_text("needle\n", encoding="utf-8")
|
|
os.utime(a, (1, 1))
|
|
os.utime(b, (2, 2))
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="needle",
|
|
path="src",
|
|
output_mode="files_with_matches",
|
|
)
|
|
|
|
assert result.splitlines() == ["src/b.py", "src/a.py"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_files_with_matches_supports_head_limit_and_offset(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
for name in ("a.py", "b.py", "c.py"):
|
|
(tmp_path / "src" / name).write_text("needle\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="needle",
|
|
path="src",
|
|
head_limit=1,
|
|
offset=1,
|
|
)
|
|
|
|
# Filesystem order is not deterministic across platforms, so just verify:
|
|
# 1. Only one file path is returned (head_limit=1 after offset=1)
|
|
# 2. The pagination info is correct
|
|
assert "pagination: limit=1, offset=1" in result
|
|
# Count non-empty lines that start with src/ (file paths)
|
|
file_lines = [line for line in result.splitlines() if line.startswith("src/")]
|
|
assert len(file_lines) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_count_mode_reports_counts_per_file(tmp_path: Path) -> None:
|
|
(tmp_path / "logs").mkdir()
|
|
(tmp_path / "logs" / "one.log").write_text("warn\nok\nwarn\n", encoding="utf-8")
|
|
(tmp_path / "logs" / "two.log").write_text("warn\n", encoding="utf-8")
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="warn",
|
|
path="logs",
|
|
output_mode="count",
|
|
)
|
|
|
|
assert "logs/one.log: 2" in result
|
|
assert "logs/two.log: 1" in result
|
|
assert "total matches: 3 in 2 files" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_files_with_matches_mode_respects_max_results(tmp_path: Path) -> None:
|
|
(tmp_path / "src").mkdir()
|
|
files = []
|
|
for idx, name in enumerate(("a.py", "b.py", "c.py"), start=1):
|
|
file_path = tmp_path / "src" / name
|
|
file_path.write_text("needle\n", encoding="utf-8")
|
|
os.utime(file_path, (idx, idx))
|
|
files.append(file_path)
|
|
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(
|
|
pattern="needle",
|
|
path="src",
|
|
output_mode="files_with_matches",
|
|
max_results=2,
|
|
)
|
|
|
|
assert result.splitlines()[:2] == ["src/c.py", "src/b.py"]
|
|
assert "pagination: limit=2, offset=0" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_reports_skipped_binary_and_large_files(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
(tmp_path / "binary.bin").write_bytes(b"\x00\x01\x02")
|
|
(tmp_path / "large.txt").write_text("x" * 20, encoding="utf-8")
|
|
|
|
monkeypatch.setattr(GrepTool, "_MAX_FILE_BYTES", 10)
|
|
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
result = await tool.execute(pattern="needle", path=".")
|
|
|
|
assert "No matches found" in result
|
|
assert "skipped 1 binary/unreadable files" in result
|
|
assert "skipped 1 large files" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_tools_reject_paths_outside_workspace(tmp_path: Path) -> None:
|
|
outside = tmp_path.parent / "outside-search.txt"
|
|
outside.write_text("secret\n", encoding="utf-8")
|
|
|
|
grep_tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
|
|
|
|
grep_result = await grep_tool.execute(pattern="secret", path=str(outside))
|
|
|
|
assert grep_result.startswith("Error:")
|
|
|
|
|
|
def test_agent_loop_registers_grep(tmp_path: Path) -> None:
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
|
|
|
|
assert "grep" in loop.tools.tool_names
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subagent_registers_grep(tmp_path: Path) -> None:
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=4096,
|
|
)
|
|
captured: dict[str, list[str]] = {}
|
|
|
|
async def fake_run(spec):
|
|
captured["tool_names"] = spec.tools.tool_names
|
|
return SimpleNamespace(
|
|
stop_reason="ok",
|
|
final_content="done",
|
|
tool_events=[],
|
|
error=None,
|
|
)
|
|
|
|
mgr.runner.run = fake_run
|
|
mgr._announce_result = AsyncMock()
|
|
|
|
status = SubagentStatus(task_id="sub-1", label="label", task_description="search task", started_at=time.monotonic())
|
|
await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"}, status)
|
|
|
|
assert "grep" in captured["tool_names"]
|
|
|
|
|
|
def test_subagent_prompt_respects_disabled_skills(tmp_path: Path) -> None:
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
skills_dir = tmp_path / "skills"
|
|
(skills_dir / "alpha").mkdir(parents=True)
|
|
(skills_dir / "alpha" / "SKILL.md").write_text("# Alpha\n\nhidden\n", encoding="utf-8")
|
|
(skills_dir / "beta").mkdir(parents=True)
|
|
(skills_dir / "beta" / "SKILL.md").write_text("# Beta\n\nshown\n", encoding="utf-8")
|
|
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=4096,
|
|
disabled_skills=["alpha"],
|
|
)
|
|
|
|
prompt = mgr._build_subagent_prompt()
|
|
|
|
assert "alpha" not in prompt
|
|
assert "beta" in prompt
|