Merge PR #2754: feat(agent): add built-in grep and glob search tools

feat(agent): add built-in grep and glob search tools
This commit is contained in:
Xubin Ren 2026-04-04 23:30:18 +08:00 committed by GitHub
commit 04a41e31ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 926 additions and 6 deletions

View File

@ -23,6 +23,7 @@ from nanobot.agent.skills import BUILTIN_SKILLS_DIR
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.search import GlobTool, GrepTool
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
@ -266,6 +267,8 @@ class AgentLoop:
self.tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir, extra_allowed_dirs=extra_read))
for cls in (WriteFileTool, EditFileTool, ListDirTool):
self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
for cls in (GlobTool, GrepTool):
self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
if self.exec_config.enable:
self.tools.register(ExecTool(
working_dir=str(self.workspace),

View File

@ -14,6 +14,7 @@ from nanobot.agent.runner import AgentRunSpec, AgentRunner
from nanobot.agent.skills import BUILTIN_SKILLS_DIR
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.search import GlobTool, GrepTool
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage
@ -116,6 +117,8 @@ class SubagentManager:
tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(GlobTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(GrepTool(workspace=self.workspace, allowed_dir=allowed_dir))
if self.exec_config.enable:
tools.register(ExecTool(
working_dir=str(self.workspace),

View File

@ -0,0 +1,553 @@
"""Search tools: grep and glob."""
from __future__ import annotations
import fnmatch
import os
import re
from pathlib import Path, PurePosixPath
from typing import Any, Iterable, TypeVar
from nanobot.agent.tools.filesystem import ListDirTool, _FsTool
_DEFAULT_HEAD_LIMIT = 250
T = TypeVar("T")
_TYPE_GLOB_MAP = {
"py": ("*.py", "*.pyi"),
"python": ("*.py", "*.pyi"),
"js": ("*.js", "*.jsx", "*.mjs", "*.cjs"),
"ts": ("*.ts", "*.tsx", "*.mts", "*.cts"),
"tsx": ("*.tsx",),
"jsx": ("*.jsx",),
"json": ("*.json",),
"md": ("*.md", "*.mdx"),
"markdown": ("*.md", "*.mdx"),
"go": ("*.go",),
"rs": ("*.rs",),
"rust": ("*.rs",),
"java": ("*.java",),
"sh": ("*.sh", "*.bash"),
"yaml": ("*.yaml", "*.yml"),
"yml": ("*.yaml", "*.yml"),
"toml": ("*.toml",),
"sql": ("*.sql",),
"html": ("*.html", "*.htm"),
"css": ("*.css", "*.scss", "*.sass"),
}
def _normalize_pattern(pattern: str) -> str:
return pattern.strip().replace("\\", "/")
def _match_glob(rel_path: str, name: str, pattern: str) -> bool:
normalized = _normalize_pattern(pattern)
if not normalized:
return False
if "/" in normalized or normalized.startswith("**"):
return PurePosixPath(rel_path).match(normalized)
return fnmatch.fnmatch(name, normalized)
def _is_binary(raw: bytes) -> bool:
if b"\x00" in raw:
return True
sample = raw[:4096]
if not sample:
return False
non_text = sum(byte < 9 or 13 < byte < 32 for byte in sample)
return (non_text / len(sample)) > 0.2
def _paginate(items: list[T], limit: int | None, offset: int) -> tuple[list[T], bool]:
if limit is None:
return items[offset:], False
sliced = items[offset : offset + limit]
truncated = len(items) > offset + limit
return sliced, truncated
def _pagination_note(limit: int | None, offset: int, truncated: bool) -> str | None:
if truncated:
if limit is None:
return f"(pagination: offset={offset})"
return f"(pagination: limit={limit}, offset={offset})"
if offset > 0:
return f"(pagination: offset={offset})"
return None
def _matches_type(name: str, file_type: str | None) -> bool:
if not file_type:
return True
lowered = file_type.strip().lower()
if not lowered:
return True
patterns = _TYPE_GLOB_MAP.get(lowered, (f"*.{lowered}",))
return any(fnmatch.fnmatch(name.lower(), pattern.lower()) for pattern in patterns)
class _SearchTool(_FsTool):
_IGNORE_DIRS = set(ListDirTool._IGNORE_DIRS)
def _display_path(self, target: Path, root: Path) -> str:
if self._workspace:
try:
return target.relative_to(self._workspace).as_posix()
except ValueError:
pass
return target.relative_to(root).as_posix()
def _iter_files(self, root: Path) -> Iterable[Path]:
if root.is_file():
yield root
return
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = sorted(d for d in dirnames if d not in self._IGNORE_DIRS)
current = Path(dirpath)
for filename in sorted(filenames):
yield current / filename
def _iter_entries(
self,
root: Path,
*,
include_files: bool,
include_dirs: bool,
) -> Iterable[Path]:
if root.is_file():
if include_files:
yield root
return
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = sorted(d for d in dirnames if d not in self._IGNORE_DIRS)
current = Path(dirpath)
if include_dirs:
for dirname in dirnames:
yield current / dirname
if include_files:
for filename in sorted(filenames):
yield current / filename
class GlobTool(_SearchTool):
"""Find files matching a glob pattern."""
@property
def name(self) -> str:
return "glob"
@property
def description(self) -> str:
return (
"Find files matching a glob pattern. "
"Simple patterns like '*.py' match by filename recursively."
)
@property
def read_only(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Glob pattern to match, e.g. '*.py' or 'tests/**/test_*.py'",
"minLength": 1,
},
"path": {
"type": "string",
"description": "Directory to search from (default '.')",
},
"max_results": {
"type": "integer",
"description": "Legacy alias for head_limit",
"minimum": 1,
"maximum": 1000,
},
"head_limit": {
"type": "integer",
"description": "Maximum number of matches to return (default 250)",
"minimum": 0,
"maximum": 1000,
},
"offset": {
"type": "integer",
"description": "Skip the first N matching entries before returning results",
"minimum": 0,
"maximum": 100000,
},
"entry_type": {
"type": "string",
"enum": ["files", "dirs", "both"],
"description": "Whether to match files, directories, or both (default files)",
},
},
"required": ["pattern"],
}
async def execute(
self,
pattern: str,
path: str = ".",
max_results: int | None = None,
head_limit: int | None = None,
offset: int = 0,
entry_type: str = "files",
**kwargs: Any,
) -> str:
try:
root = self._resolve(path or ".")
if not root.exists():
return f"Error: Path not found: {path}"
if not root.is_dir():
return f"Error: Not a directory: {path}"
if head_limit is not None:
limit = None if head_limit == 0 else head_limit
elif max_results is not None:
limit = max_results
else:
limit = _DEFAULT_HEAD_LIMIT
include_files = entry_type in {"files", "both"}
include_dirs = entry_type in {"dirs", "both"}
matches: list[tuple[str, float]] = []
for entry in self._iter_entries(
root,
include_files=include_files,
include_dirs=include_dirs,
):
rel_path = entry.relative_to(root).as_posix()
if _match_glob(rel_path, entry.name, pattern):
display = self._display_path(entry, root)
if entry.is_dir():
display += "/"
try:
mtime = entry.stat().st_mtime
except OSError:
mtime = 0.0
matches.append((display, mtime))
if not matches:
return f"No paths matched pattern '{pattern}' in {path}"
matches.sort(key=lambda item: (-item[1], item[0]))
ordered = [name for name, _ in matches]
paged, truncated = _paginate(ordered, limit, offset)
result = "\n".join(paged)
if note := _pagination_note(limit, offset, truncated):
result += f"\n\n{note}"
return result
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error finding files: {e}"
class GrepTool(_SearchTool):
"""Search file contents using a regex-like pattern."""
_MAX_RESULT_CHARS = 128_000
_MAX_FILE_BYTES = 2_000_000
@property
def name(self) -> str:
return "grep"
@property
def description(self) -> str:
return (
"Search file contents with a regex-like pattern. "
"Supports optional glob filtering, structured output modes, "
"type filters, pagination, and surrounding context lines."
)
@property
def read_only(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Regex or plain text pattern to search for",
"minLength": 1,
},
"path": {
"type": "string",
"description": "File or directory to search in (default '.')",
},
"glob": {
"type": "string",
"description": "Optional file filter, e.g. '*.py' or 'tests/**/test_*.py'",
},
"type": {
"type": "string",
"description": "Optional file type shorthand, e.g. 'py', 'ts', 'md', 'json'",
},
"case_insensitive": {
"type": "boolean",
"description": "Case-insensitive search (default false)",
},
"fixed_strings": {
"type": "boolean",
"description": "Treat pattern as plain text instead of regex (default false)",
},
"output_mode": {
"type": "string",
"enum": ["content", "files_with_matches", "count"],
"description": (
"content: matching lines with optional context; "
"files_with_matches: only matching file paths; "
"count: matching line counts per file. "
"Default: files_with_matches"
),
},
"context_before": {
"type": "integer",
"description": "Number of lines of context before each match",
"minimum": 0,
"maximum": 20,
},
"context_after": {
"type": "integer",
"description": "Number of lines of context after each match",
"minimum": 0,
"maximum": 20,
},
"max_matches": {
"type": "integer",
"description": (
"Legacy alias for head_limit in content mode"
),
"minimum": 1,
"maximum": 1000,
},
"max_results": {
"type": "integer",
"description": (
"Legacy alias for head_limit in files_with_matches or count mode"
),
"minimum": 1,
"maximum": 1000,
},
"head_limit": {
"type": "integer",
"description": (
"Maximum number of results to return. In content mode this limits "
"matching line blocks; in other modes it limits file entries. "
"Default 250"
),
"minimum": 0,
"maximum": 1000,
},
"offset": {
"type": "integer",
"description": "Skip the first N results before applying head_limit",
"minimum": 0,
"maximum": 100000,
},
},
"required": ["pattern"],
}
@staticmethod
def _format_block(
display_path: str,
lines: list[str],
match_line: int,
before: int,
after: int,
) -> str:
start = max(1, match_line - before)
end = min(len(lines), match_line + after)
block = [f"{display_path}:{match_line}"]
for line_no in range(start, end + 1):
marker = ">" if line_no == match_line else " "
block.append(f"{marker} {line_no}| {lines[line_no - 1]}")
return "\n".join(block)
async def execute(
self,
pattern: str,
path: str = ".",
glob: str | None = None,
type: str | None = None,
case_insensitive: bool = False,
fixed_strings: bool = False,
output_mode: str = "files_with_matches",
context_before: int = 0,
context_after: int = 0,
max_matches: int | None = None,
max_results: int | None = None,
head_limit: int | None = None,
offset: int = 0,
**kwargs: Any,
) -> str:
try:
target = self._resolve(path or ".")
if not target.exists():
return f"Error: Path not found: {path}"
if not (target.is_dir() or target.is_file()):
return f"Error: Unsupported path: {path}"
flags = re.IGNORECASE if case_insensitive else 0
try:
needle = re.escape(pattern) if fixed_strings else pattern
regex = re.compile(needle, flags)
except re.error as e:
return f"Error: invalid regex pattern: {e}"
if head_limit is not None:
limit = None if head_limit == 0 else head_limit
elif output_mode == "content" and max_matches is not None:
limit = max_matches
elif output_mode != "content" and max_results is not None:
limit = max_results
else:
limit = _DEFAULT_HEAD_LIMIT
blocks: list[str] = []
result_chars = 0
seen_content_matches = 0
truncated = False
size_truncated = False
skipped_binary = 0
skipped_large = 0
matching_files: list[str] = []
counts: dict[str, int] = {}
file_mtimes: dict[str, float] = {}
root = target if target.is_dir() else target.parent
for file_path in self._iter_files(target):
rel_path = file_path.relative_to(root).as_posix()
if glob and not _match_glob(rel_path, file_path.name, glob):
continue
if not _matches_type(file_path.name, type):
continue
raw = file_path.read_bytes()
if len(raw) > self._MAX_FILE_BYTES:
skipped_large += 1
continue
if _is_binary(raw):
skipped_binary += 1
continue
try:
mtime = file_path.stat().st_mtime
except OSError:
mtime = 0.0
try:
content = raw.decode("utf-8")
except UnicodeDecodeError:
skipped_binary += 1
continue
lines = content.splitlines()
display_path = self._display_path(file_path, root)
file_had_match = False
for idx, line in enumerate(lines, start=1):
if not regex.search(line):
continue
file_had_match = True
if output_mode == "count":
counts[display_path] = counts.get(display_path, 0) + 1
continue
if output_mode == "files_with_matches":
if display_path not in matching_files:
matching_files.append(display_path)
file_mtimes[display_path] = mtime
break
seen_content_matches += 1
if seen_content_matches <= offset:
continue
if limit is not None and len(blocks) >= limit:
truncated = True
break
block = self._format_block(
display_path,
lines,
idx,
context_before,
context_after,
)
extra_sep = 2 if blocks else 0
if result_chars + extra_sep + len(block) > self._MAX_RESULT_CHARS:
size_truncated = True
break
blocks.append(block)
result_chars += extra_sep + len(block)
if output_mode == "count" and file_had_match:
if display_path not in matching_files:
matching_files.append(display_path)
file_mtimes[display_path] = mtime
if output_mode in {"count", "files_with_matches"} and file_had_match:
continue
if truncated or size_truncated:
break
if output_mode == "files_with_matches":
if not matching_files:
result = f"No matches found for pattern '{pattern}' in {path}"
else:
ordered_files = sorted(
matching_files,
key=lambda name: (-file_mtimes.get(name, 0.0), name),
)
paged, truncated = _paginate(ordered_files, limit, offset)
result = "\n".join(paged)
elif output_mode == "count":
if not counts:
result = f"No matches found for pattern '{pattern}' in {path}"
else:
ordered_files = sorted(
matching_files,
key=lambda name: (-file_mtimes.get(name, 0.0), name),
)
ordered, truncated = _paginate(ordered_files, limit, offset)
lines = [f"{name}: {counts[name]}" for name in ordered]
result = "\n".join(lines)
else:
if not blocks:
result = f"No matches found for pattern '{pattern}' in {path}"
else:
result = "\n\n".join(blocks)
notes: list[str] = []
if output_mode == "content" and truncated:
notes.append(
f"(pagination: limit={limit}, offset={offset})"
)
elif output_mode == "content" and size_truncated:
notes.append("(output truncated due to size)")
elif truncated and output_mode in {"count", "files_with_matches"}:
notes.append(
f"(pagination: limit={limit}, offset={offset})"
)
elif output_mode in {"count", "files_with_matches"} and offset > 0:
notes.append(f"(pagination: offset={offset})")
elif output_mode == "content" and offset > 0 and blocks:
notes.append(f"(pagination: offset={offset})")
if skipped_binary:
notes.append(f"(skipped {skipped_binary} binary/unreadable files)")
if skipped_large:
notes.append(f"(skipped {skipped_large} large files)")
if output_mode == "count" and counts:
notes.append(
f"(total matches: {sum(counts.values())} in {len(counts)} files)"
)
if notes:
result += "\n\n" + "\n".join(notes)
return result
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error searching files: {e}"

View File

@ -8,6 +8,12 @@ Each skill is a directory containing a `SKILL.md` file with:
- YAML frontmatter (name, description, metadata)
- Markdown instructions for the agent
When skills reference large local documentation or logs, prefer nanobot's built-in
`grep` / `glob` tools to narrow the search space before loading full files.
Use `grep(output_mode="count")` / `files_with_matches` for broad searches first,
use `head_limit` / `offset` to page through large result sets,
and `glob(entry_type="dirs")` when discovering directory structure matters.
## Attribution
These skills are adapted from [OpenClaw](https://github.com/openclaw/openclaw)'s skill system.

View File

@ -11,16 +11,23 @@ always: true
- `SOUL.md` — Bot personality and communication style. **Managed by Dream.** Do NOT edit.
- `USER.md` — User profile and preferences. **Managed by Dream.** Do NOT edit.
- `memory/MEMORY.md` — Long-term facts (project context, important events). **Managed by Dream.** Do NOT edit.
- `memory/history.jsonl` — append-only JSONL, not loaded into context. search with `jq`-style tools.
- `memory/history.jsonl` — append-only JSONL, not loaded into context. Prefer the built-in `grep` tool to search it.
## Search Past Events
`memory/history.jsonl` is JSONL format — each line is a JSON object with `cursor`, `timestamp`, `content`.
- For broad searches, start with `grep(..., path="memory", glob="*.jsonl", output_mode="count")` or the default `files_with_matches` mode before expanding to full content
- Use `output_mode="content"` plus `context_before` / `context_after` when you need the exact matching lines
- Use `fixed_strings=true` for literal timestamps or JSON fragments
- Use `head_limit` / `offset` to page through long histories
- Use `exec` only as a last-resort fallback when the built-in search cannot express what you need
Examples (replace `keyword`):
- **Python (cross-platform):** `python -c "import json; [print(json.loads(l).get('content','')) for l in open('memory/history.jsonl','r',encoding='utf-8') if l.strip() and 'keyword' in l.lower()][-20:]"`
- **jq:** `cat memory/history.jsonl | jq -r 'select(.content | test("keyword"; "i")) | .content' | tail -20`
- **grep:** `grep -i "keyword" memory/history.jsonl`
- `grep(pattern="keyword", path="memory/history.jsonl", case_insensitive=true)`
- `grep(pattern="2026-04-02 10:00", path="memory/history.jsonl", fixed_strings=true)`
- `grep(pattern="keyword", path="memory", glob="*.jsonl", output_mode="count", case_insensitive=true)`
- `grep(pattern="oauth|token", path="memory", glob="*.jsonl", output_mode="content", case_insensitive=true)`
## Important

View File

@ -86,7 +86,7 @@ Documentation and reference material intended to be loaded as needed into contex
- **Examples**: `references/finance.md` for financial schemas, `references/mnda.md` for company NDA template, `references/policies.md` for company policies, `references/api_docs.md` for API specifications
- **Use cases**: Database schemas, API documentation, domain knowledge, company policies, detailed workflow guides
- **Benefits**: Keeps SKILL.md lean, loaded only when the agent determines it's needed
- **Best practice**: If files are large (>10k words), include grep search patterns in SKILL.md
- **Best practice**: If files are large (>10k words), include grep or glob patterns in SKILL.md so the agent can use built-in search tools efficiently; mention when the default `grep(output_mode="files_with_matches")`, `grep(output_mode="count")`, `grep(fixed_strings=true)`, `glob(entry_type="dirs")`, or pagination via `head_limit` / `offset` is the right first step
- **Avoid duplication**: Information should live in either SKILL.md or references files, not both. Prefer references files for detailed information unless it's truly core to the skill—this keeps SKILL.md lean while making information discoverable without hogging the context window. Keep only essential procedural instructions and workflow guidance in SKILL.md; move detailed reference material, schemas, and examples to references files.
##### Assets (`assets/`)

View File

@ -10,6 +10,27 @@ This file documents non-obvious constraints and usage patterns.
- Output is truncated at 10,000 characters
- `restrictToWorkspace` config can limit file access to the workspace
## glob — File Discovery
- Use `glob` to find files by pattern before falling back to shell commands
- Simple patterns like `*.py` match recursively by filename
- Use `entry_type="dirs"` when you need matching directories instead of files
- Use `head_limit` and `offset` to page through large result sets
- Prefer this over `exec` when you only need file paths
## grep — Content Search
- Use `grep` to search file contents inside the workspace
- Default behavior returns only matching file paths (`output_mode="files_with_matches"`)
- Supports optional `glob` filtering plus `context_before` / `context_after`
- Supports `type="py"`, `type="ts"`, `type="md"` and similar shorthand filters
- Use `fixed_strings=true` for literal keywords containing regex characters
- Use `output_mode="files_with_matches"` to get only matching file paths
- Use `output_mode="count"` to size a search before reading full matches
- Use `head_limit` and `offset` to page across results
- Prefer this over `exec` for code and history searches
- Binary or oversized files may be skipped to keep results readable
## cron — Scheduled Reminders
- Please refer to cron skill for usage.

View File

@ -8,7 +8,7 @@ You are nanobot, a helpful AI assistant.
## Workspace
Your workspace is at: {{ workspace_path }}
- Long-term memory: {{ workspace_path }}/memory/MEMORY.md (automatically managed by Dream — do not edit directly)
- History log: {{ workspace_path }}/memory/history.jsonl (append-only JSONL, not grep-searchable).
- History log: {{ workspace_path }}/memory/history.jsonl (append-only JSONL; prefer built-in `grep` for search).
- Custom skills: {{ workspace_path }}/skills/{% raw %}{skill-name}{% endraw %}/SKILL.md
{{ platform_policy }}
@ -19,6 +19,8 @@ Your workspace is at: {{ workspace_path }}
- After writing or editing a file, re-read it if accuracy matters.
- If a tool call fails, analyze the error before retrying with a different approach.
- Ask for clarification when the request is ambiguous.
- Prefer built-in `grep` / `glob` tools for workspace search before falling back to `exec`.
- On broad searches, use `grep(output_mode="count")` or `grep(output_mode="files_with_matches")` to scope the result set before requesting full content.
{% include 'agent/_snippets/untrusted_content.md' %}
Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel.

View File

@ -0,0 +1,325 @@
"""Tests for grep/glob search tools."""
from __future__ import annotations
import os
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from nanobot.agent.loop import AgentLoop
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.search import GlobTool, GrepTool
from nanobot.bus.queue import MessageBus
@pytest.mark.asyncio
async def test_glob_matches_recursively_and_skips_noise_dirs(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "nested").mkdir()
(tmp_path / "node_modules").mkdir()
(tmp_path / "src" / "app.py").write_text("print('ok')\n", encoding="utf-8")
(tmp_path / "nested" / "util.py").write_text("print('ok')\n", encoding="utf-8")
(tmp_path / "node_modules" / "skip.py").write_text("print('skip')\n", encoding="utf-8")
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(pattern="*.py", path=".")
assert "src/app.py" in result
assert "nested/util.py" in result
assert "node_modules/skip.py" not in result
@pytest.mark.asyncio
async def test_glob_can_return_directories_only(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "api").mkdir(parents=True)
(tmp_path / "src" / "api" / "handlers.py").write_text("ok\n", encoding="utf-8")
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="api",
path="src",
entry_type="dirs",
)
assert result.splitlines() == ["src/api/"]
@pytest.mark.asyncio
async def test_grep_respects_glob_filter_and_context(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "main.py").write_text(
"alpha\nbeta\nmatch_here\ngamma\n",
encoding="utf-8",
)
(tmp_path / "README.md").write_text("match_here\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="match_here",
path=".",
glob="*.py",
output_mode="content",
context_before=1,
context_after=1,
)
assert "src/main.py:3" in result
assert " 2| beta" in result
assert "> 3| match_here" in result
assert " 4| gamma" in result
assert "README.md" not in result
@pytest.mark.asyncio
async def test_grep_defaults_to_files_with_matches(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "main.py").write_text("match_here\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="match_here",
path="src",
)
assert result.splitlines() == ["src/main.py"]
assert "1|" not in result
@pytest.mark.asyncio
async def test_grep_supports_case_insensitive_search(tmp_path: Path) -> None:
(tmp_path / "memory").mkdir()
(tmp_path / "memory" / "HISTORY.md").write_text(
"[2026-04-02 10:00] OAuth token rotated\n",
encoding="utf-8",
)
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="oauth",
path="memory/HISTORY.md",
case_insensitive=True,
output_mode="content",
)
assert "memory/HISTORY.md:1" in result
assert "OAuth token rotated" in result
@pytest.mark.asyncio
async def test_grep_type_filter_limits_files(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "a.py").write_text("needle\n", encoding="utf-8")
(tmp_path / "src" / "b.md").write_text("needle\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="needle",
path="src",
type="py",
)
assert result.splitlines() == ["src/a.py"]
@pytest.mark.asyncio
async def test_grep_fixed_strings_treats_regex_chars_literally(tmp_path: Path) -> None:
(tmp_path / "memory").mkdir()
(tmp_path / "memory" / "HISTORY.md").write_text(
"[2026-04-02 10:00] OAuth token rotated\n",
encoding="utf-8",
)
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="[2026-04-02 10:00]",
path="memory/HISTORY.md",
fixed_strings=True,
output_mode="content",
)
assert "memory/HISTORY.md:1" in result
assert "[2026-04-02 10:00] OAuth token rotated" in result
@pytest.mark.asyncio
async def test_grep_files_with_matches_mode_returns_unique_paths(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
a = tmp_path / "src" / "a.py"
b = tmp_path / "src" / "b.py"
a.write_text("needle\nneedle\n", encoding="utf-8")
b.write_text("needle\n", encoding="utf-8")
os.utime(a, (1, 1))
os.utime(b, (2, 2))
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="needle",
path="src",
output_mode="files_with_matches",
)
assert result.splitlines() == ["src/b.py", "src/a.py"]
@pytest.mark.asyncio
async def test_grep_files_with_matches_supports_head_limit_and_offset(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
for name in ("a.py", "b.py", "c.py"):
(tmp_path / "src" / name).write_text("needle\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="needle",
path="src",
head_limit=1,
offset=1,
)
lines = result.splitlines()
assert lines[0] == "src/b.py"
assert "pagination: limit=1, offset=1" in result
@pytest.mark.asyncio
async def test_grep_count_mode_reports_counts_per_file(tmp_path: Path) -> None:
(tmp_path / "logs").mkdir()
(tmp_path / "logs" / "one.log").write_text("warn\nok\nwarn\n", encoding="utf-8")
(tmp_path / "logs" / "two.log").write_text("warn\n", encoding="utf-8")
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="warn",
path="logs",
output_mode="count",
)
assert "logs/one.log: 2" in result
assert "logs/two.log: 1" in result
assert "total matches: 3 in 2 files" in result
@pytest.mark.asyncio
async def test_grep_files_with_matches_mode_respects_max_results(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
files = []
for idx, name in enumerate(("a.py", "b.py", "c.py"), start=1):
file_path = tmp_path / "src" / name
file_path.write_text("needle\n", encoding="utf-8")
os.utime(file_path, (idx, idx))
files.append(file_path)
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="needle",
path="src",
output_mode="files_with_matches",
max_results=2,
)
assert result.splitlines()[:2] == ["src/c.py", "src/b.py"]
assert "pagination: limit=2, offset=0" in result
@pytest.mark.asyncio
async def test_glob_supports_head_limit_offset_and_recent_first(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
a = tmp_path / "src" / "a.py"
b = tmp_path / "src" / "b.py"
c = tmp_path / "src" / "c.py"
a.write_text("a\n", encoding="utf-8")
b.write_text("b\n", encoding="utf-8")
c.write_text("c\n", encoding="utf-8")
os.utime(a, (1, 1))
os.utime(b, (2, 2))
os.utime(c, (3, 3))
tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
pattern="*.py",
path="src",
head_limit=1,
offset=1,
)
lines = result.splitlines()
assert lines[0] == "src/b.py"
assert "pagination: limit=1, offset=1" in result
@pytest.mark.asyncio
async def test_grep_reports_skipped_binary_and_large_files(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
(tmp_path / "binary.bin").write_bytes(b"\x00\x01\x02")
(tmp_path / "large.txt").write_text("x" * 20, encoding="utf-8")
monkeypatch.setattr(GrepTool, "_MAX_FILE_BYTES", 10)
tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(pattern="needle", path=".")
assert "No matches found" in result
assert "skipped 1 binary/unreadable files" in result
assert "skipped 1 large files" in result
@pytest.mark.asyncio
async def test_search_tools_reject_paths_outside_workspace(tmp_path: Path) -> None:
outside = tmp_path.parent / "outside-search.txt"
outside.write_text("secret\n", encoding="utf-8")
grep_tool = GrepTool(workspace=tmp_path, allowed_dir=tmp_path)
glob_tool = GlobTool(workspace=tmp_path, allowed_dir=tmp_path)
grep_result = await grep_tool.execute(pattern="secret", path=str(outside))
glob_result = await glob_tool.execute(pattern="*.txt", path=str(outside.parent))
assert grep_result.startswith("Error:")
assert glob_result.startswith("Error:")
def test_agent_loop_registers_grep_and_glob(tmp_path: Path) -> None:
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
assert "grep" in loop.tools.tool_names
assert "glob" in loop.tools.tool_names
@pytest.mark.asyncio
async def test_subagent_registers_grep_and_glob(tmp_path: Path) -> None:
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(
provider=provider,
workspace=tmp_path,
bus=bus,
max_tool_result_chars=4096,
)
captured: dict[str, list[str]] = {}
async def fake_run(spec):
captured["tool_names"] = spec.tools.tool_names
return SimpleNamespace(
stop_reason="ok",
final_content="done",
tool_events=[],
error=None,
)
mgr.runner.run = fake_run
mgr._announce_result = AsyncMock()
await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"})
assert "grep" in captured["tool_names"]
assert "glob" in captured["tool_names"]