nanobot/tests/tools/test_search_tools.py
chengyongru 92be47247e feat(agent): add SelfTool for runtime self-inspection and configuration
Add a built-in tool that lets the agent inspect and modify its own
runtime state (model, iterations, context window, etc.).

Key features:
- inspect: view current config, usage stats, and subagent status
- modify: adjust parameters at runtime (protected by type/range validation)
- Subagent observability: inspect running subagent tasks (phase,
  iteration, tool events, errors) — subagents are no longer a black box
- Watchdog corrects out-of-bounds values on each iteration
- Enabled by default in read-only mode (self_modify: false)
- All changes are in-memory only; restart restores defaults
- Comprehensive test suite (90 tests)

Includes a self-awareness skill (always-on) with progressive disclosure:
SKILL.md for core rules, references/examples.md for detailed scenarios.
2026-04-16 00:37:34 +08:00

352 lines
11 KiB
Python

"""Tests for grep/glob search tools."""
from __future__ import annotations
import os
import time
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from nanobot.agent.loop import AgentLoop
from nanobot.agent.subagent import SubagentManager, SubagentStatus
from nanobot.agent.tools.search import GlobTool, GrepTool
from nanobot.bus.queue import MessageBus
@pytest.mark.asyncio
async def test_glob_matches_recursively_and_skips_noise_dirs(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "nested").mkdir()
(tmp_path / "node_modules").mkdir()
(tmp_path / "src" / "app.py").write_text("print('ok')\n", encoding="utf-8")
(tmp_path / "nested" / "util.py").write_text("print('ok')\n", encoding="utf-8")
(tmp_path / "node_modules" / "skip.py").write_text("print('skip')\n", encoding="utf-8")
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(pattern="*.py", path=".")
assert "src/app.py" in result
assert "nested/util.py" in result
assert "node_modules/skip.py" not in result
@pytest.mark.asyncio
async def test_glob_can_return_directories_only(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "api").mkdir(parents=True)
(tmp_path / "src" / "api" / "handlers.py").write_text("ok\n", encoding="utf-8")
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="api",
path="src",
entry_type="dirs",
)
assert result.splitlines() == ["src/api/"]
@pytest.mark.asyncio
async def test_grep_respects_glob_filter_and_context(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "main.py").write_text(
"alpha\nbeta\nmatch_here\ngamma\n",
encoding="utf-8",
)
(tmp_path / "README.md").write_text("match_here\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="match_here",
path=".",
glob="*.py",
output_mode="content",
context_before=1,
context_after=1,
)
assert "src/main.py:3" in result
assert " 2| beta" in result
assert "> 3| match_here" in result
assert " 4| gamma" in result
assert "README.md" not in result
@pytest.mark.asyncio
async def test_grep_defaults_to_files_with_matches(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "main.py").write_text("match_here\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="match_here",
path="src",
)
assert result.splitlines() == ["src/main.py"]
assert "1|" not in result
@pytest.mark.asyncio
async def test_grep_supports_case_insensitive_search(tmp_path: Path) -> None:
(tmp_path / "memory").mkdir()
(tmp_path / "memory" / "HISTORY.md").write_text(
"[2026-04-02 10:00] OAuth token rotated\n",
encoding="utf-8",
)
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="oauth",
path="memory/HISTORY.md",
case_insensitive=True,
output_mode="content",
)
assert "memory/HISTORY.md:1" in result
assert "OAuth token rotated" in result
@pytest.mark.asyncio
async def test_grep_type_filter_limits_files(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "a.py").write_text("needle\n", encoding="utf-8")
(tmp_path / "src" / "b.md").write_text("needle\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="needle",
path="src",
type="py",
)
assert result.splitlines() == ["src/a.py"]
@pytest.mark.asyncio
async def test_grep_fixed_strings_treats_regex_chars_literally(tmp_path: Path) -> None:
(tmp_path / "memory").mkdir()
(tmp_path / "memory" / "HISTORY.md").write_text(
"[2026-04-02 10:00] OAuth token rotated\n",
encoding="utf-8",
)
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="[2026-04-02 10:00]",
path="memory/HISTORY.md",
fixed_strings=True,
output_mode="content",
)
assert "memory/HISTORY.md:1" in result
assert "[2026-04-02 10:00] OAuth token rotated" in result
@pytest.mark.asyncio
async def test_grep_files_with_matches_mode_returns_unique_paths(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
a = tmp_path / "src" / "a.py"
b = tmp_path / "src" / "b.py"
a.write_text("needle\nneedle\n", encoding="utf-8")
b.write_text("needle\n", encoding="utf-8")
os.utime(a, (1, 1))
os.utime(b, (2, 2))
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="needle",
path="src",
output_mode="files_with_matches",
)
assert result.splitlines() == ["src/b.py", "src/a.py"]
@pytest.mark.asyncio
async def test_grep_files_with_matches_supports_head_limit_and_offset(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
for name in ("a.py", "b.py", "c.py"):
(tmp_path / "src" / name).write_text("needle\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="needle",
path="src",
head_limit=1,
offset=1,
)
lines = result.splitlines()
assert lines[0] == "src/b.py"
assert "pagination: limit=1, offset=1" in result
@pytest.mark.asyncio
async def test_grep_count_mode_reports_counts_per_file(tmp_path: Path) -> None:
(tmp_path / "logs").mkdir()
(tmp_path / "logs" / "one.log").write_text("warn\nok\nwarn\n", encoding="utf-8")
(tmp_path / "logs" / "two.log").write_text("warn\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="warn",
path="logs",
output_mode="count",
)
assert "logs/one.log: 2" in result
assert "logs/two.log: 1" in result
assert "total matches: 3 in 2 files" in result
@pytest.mark.asyncio
async def test_grep_files_with_matches_mode_respects_max_results(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
files = []
for idx, name in enumerate(("a.py", "b.py", "c.py"), start=1):
file_path = tmp_path / "src" / name
file_path.write_text("needle\n", encoding="utf-8")
os.utime(file_path, (idx, idx))
files.append(file_path)
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="needle",
path="src",
output_mode="files_with_matches",
max_results=2,
)
assert result.splitlines()[:2] == ["src/c.py", "src/b.py"]
assert "pagination: limit=2, offset=0" in result
@pytest.mark.asyncio
async def test_glob_supports_head_limit_offset_and_recent_first(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
a = tmp_path / "src" / "a.py"
b = tmp_path / "src" / "b.py"
c = tmp_path / "src" / "c.py"
a.write_text("a\n", encoding="utf-8")
b.write_text("b\n", encoding="utf-8")
c.write_text("c\n", encoding="utf-8")
os.utime(a, (1, 1))
os.utime(b, (2, 2))
os.utime(c, (3, 3))
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="*.py",
path="src",
head_limit=1,
offset=1,
)
lines = result.splitlines()
assert lines[0] == "src/b.py"
assert "pagination: limit=1, offset=1" in result
@pytest.mark.asyncio
async def test_grep_reports_skipped_binary_and_large_files(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
(tmp_path / "binary.bin").write_bytes(b"\x00\x01\x02")
(tmp_path / "large.txt").write_text("x" * 20, encoding="utf-8")
monkeypatch.setattr(GrepTool, "_MAX_FILE_BYTES", 10)
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(pattern="needle", path=".")
assert "No matches found" in result
assert "skipped 1 binary/unreadable files" in result
assert "skipped 1 large files" in result
@pytest.mark.asyncio
async def test_search_tools_reject_paths_outside_workspace(tmp_path: Path) -> None:
outside = tmp_path.parent / "outside-search.txt"
outside.write_text("secret\n", encoding="utf-8")
grep_tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
glob_tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
grep_result = await grep_tool.execute(pattern="secret", path=str(outside))
glob_result = await glob_tool.execute(pattern="*.txt", path=str(outside.parent))
assert grep_result.startswith("Error:")
assert glob_result.startswith("Error:")
def test_agent_loop_registers_grep_and_glob(tmp_path: Path) -> None:
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
assert "grep" in loop.tools.tool_names
assert "glob" in loop.tools.tool_names
@pytest.mark.asyncio
async def test_subagent_registers_grep_and_glob(tmp_path: Path) -> None:
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(
provider=provider,
workspace=tmp_path,
bus=bus,
max_tool_result_chars=4096,
)
captured: dict[str, list[str]] = {}
async def fake_run(spec):
captured["tool_names"] = spec.tools.tool_names
return SimpleNamespace(
stop_reason="ok",
final_content="done",
tool_events=[],
error=None,
)
mgr.runner.run = fake_run
mgr._announce_result = AsyncMock()
status = SubagentStatus(task_id="sub-1", label="label", task_description="search task", started_at=time.monotonic())
await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"}, status)
assert "grep" in captured["tool_names"]
assert "glob" in captured["tool_names"]
def test_subagent_prompt_respects_disabled_skills(tmp_path: Path) -> None:
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
skills_dir = tmp_path / "skills"
(skills_dir / "alpha").mkdir(parents=True)
(skills_dir / "alpha" / "SKILL.md").write_text("# Alpha\n\nhidden\n", encoding="utf-8")
(skills_dir / "beta").mkdir(parents=True)
(skills_dir / "beta" / "SKILL.md").write_text("# Beta\n\nshown\n", encoding="utf-8")
mgr = SubagentManager(
provider=provider,
workspace=tmp_path,
bus=bus,
max_tool_result_chars=4096,
disabled_skills=["alpha"],
)
prompt = mgr._build_subagent_prompt()
assert "alpha" not in prompt
assert "beta" in prompt