feat(tools): improve coding workflow recovery

This commit is contained in:
Xubin Ren 2026-05-21 00:58:05 +08:00
parent 3e154bb5cf
commit 480ca28a2d
9 changed files with 571 additions and 32 deletions

View File

@ -19,7 +19,8 @@ from nanobot.utils.file_edit_events import (
build_file_edit_end_event,
build_file_edit_error_event,
build_file_edit_start_event,
prepare_file_edit_tracker,
prepare_file_edit_tracker as _prepare_file_edit_tracker,
prepare_file_edit_trackers,
StreamingFileEditTracker,
)
from nanobot.utils.helpers import (
@ -58,11 +59,14 @@ _SNIP_SAFETY_BUFFER = 1024
_MICROCOMPACT_KEEP_RECENT = 10
_MICROCOMPACT_MIN_CHARS = 500
_COMPACTABLE_TOOLS = frozenset({
"read_file", "exec", "grep",
"web_search", "web_fetch", "list_dir",
"read_file", "exec", "grep", "find_files",
"web_search", "web_fetch", "list_dir", "list_exec_sessions",
})
_BACKFILL_CONTENT = "[Tool result unavailable — call was interrupted or lost]"
# Backward-compatible module attribute for tests/extensions that monkeypatch
# the former single-file tracker hook. Runtime uses prepare_file_edit_trackers.
prepare_file_edit_tracker = _prepare_file_edit_tracker
@dataclass(slots=True)
@ -857,8 +861,8 @@ class AgentRunner:
and on_progress_accepts_file_edit_events(spec.progress_callback)
)
progress_callback = spec.progress_callback if emit_file_edit_events else None
file_edit_tracker = (
prepare_file_edit_tracker(
file_edit_trackers = (
prepare_file_edit_trackers(
call_id=tool_call.id,
tool_name=tool_call.name,
tool=tool,
@ -868,13 +872,13 @@ class AgentRunner:
if progress_callback is not None
else None
)
if file_edit_tracker is not None and progress_callback is not None:
if file_edit_trackers and progress_callback is not None:
await invoke_file_edit_progress(
progress_callback,
[build_file_edit_start_event(
file_edit_tracker,
params if isinstance(params, dict) else None,
)],
) for file_edit_tracker in file_edit_trackers],
)
try:
if tool is not None:
@ -884,10 +888,13 @@ class AgentRunner:
except asyncio.CancelledError:
raise
except BaseException as exc:
if file_edit_tracker is not None and progress_callback is not None:
if file_edit_trackers and progress_callback is not None:
await invoke_file_edit_progress(
progress_callback,
[build_file_edit_error_event(file_edit_tracker, str(exc))],
[
build_file_edit_error_event(file_edit_tracker, str(exc))
for file_edit_tracker in file_edit_trackers
],
)
event = {
"name": tool_call.name,
@ -910,10 +917,13 @@ class AgentRunner:
return payload, event, None
if isinstance(result, str) and result.startswith("Error"):
if file_edit_tracker is not None and progress_callback is not None:
if file_edit_trackers and progress_callback is not None:
await invoke_file_edit_progress(
progress_callback,
[build_file_edit_error_event(file_edit_tracker, result)],
[
build_file_edit_error_event(file_edit_tracker, result)
for file_edit_tracker in file_edit_trackers
],
)
event = {
"name": tool_call.name,
@ -933,13 +943,13 @@ class AgentRunner:
return result + hint, event, RuntimeError(result)
return result + hint, event, None
if file_edit_tracker is not None and progress_callback is not None:
if file_edit_trackers and progress_callback is not None:
await invoke_file_edit_progress(
progress_callback,
[build_file_edit_end_event(
file_edit_tracker,
params if isinstance(params, dict) else None,
)],
) for file_edit_tracker in file_edit_trackers],
)
detail = "" if result is None else str(result)

View File

@ -25,22 +25,39 @@ class _SessionPoll:
output: str
done: bool
exit_code: int | None
elapsed_s: float = 0.0
timed_out: bool = False
terminated: bool = False
stdin_closed: bool = False
truncated_chars: int = 0
@dataclass(slots=True)
class ExecSessionInfo:
session_id: str
command: str
cwd: str
elapsed_s: float
idle_s: float
remaining_s: float
returncode: int | None
class _ExecSession:
def __init__(
self,
*,
session_id: str,
process: asyncio.subprocess.Process,
command: str,
cwd: str,
timeout: int,
) -> None:
self.session_id = session_id
self.process = process
self.command = command
self.cwd = cwd
self.started_at = time.monotonic()
self.deadline = time.monotonic() + timeout
self.last_access = time.monotonic()
self._chunks: list[str] = []
@ -122,6 +139,7 @@ class _ExecSession:
output=output,
done=self.process.returncode is not None,
exit_code=self.process.returncode,
elapsed_s=max(0.0, time.monotonic() - self.started_at),
timed_out=self._timed_out,
terminated=terminated,
stdin_closed=stdin_closed,
@ -161,7 +179,13 @@ class ExecSessionManager:
raise RuntimeError(f"maximum exec sessions reached ({self.max_sessions})")
process = await self._spawn(command, cwd, env, shell_program, login)
session_id = uuid.uuid4().hex[:12]
session = _ExecSession(session_id=session_id, process=process, timeout=timeout)
session = _ExecSession(
session_id=session_id,
process=process,
command=command,
cwd=cwd,
timeout=timeout,
)
self._sessions[session_id] = session
poll = await session.poll(yield_time_ms, max_output_chars)
@ -186,7 +210,7 @@ class ExecSessionManager:
if session is None:
raise KeyError(session_id)
if chars is not None:
if chars:
error = await session.write(chars)
if error:
raise RuntimeError(error)
@ -209,13 +233,29 @@ class ExecSessionManager:
self._sessions.pop(session_id, None)
return poll
async def list(self) -> list[ExecSessionInfo]:
async with self._lock:
await self._cleanup_locked()
now = time.monotonic()
return [
ExecSessionInfo(
session_id=session_id,
command=session.command,
cwd=session.cwd,
elapsed_s=max(0.0, now - session.started_at),
idle_s=max(0.0, now - session.last_access),
remaining_s=max(0.0, session.deadline - now),
returncode=session.process.returncode,
)
for session_id, session in sorted(self._sessions.items())
]
async def _cleanup_locked(self) -> None:
now = time.monotonic()
stale = [
session_id
for session_id, session in self._sessions.items()
if session.process.returncode is not None
or now - session.last_access > self.idle_timeout
if now - session.last_access > self.idle_timeout
]
for session_id in stale:
session = self._sessions.pop(session_id)
@ -291,6 +331,7 @@ def format_session_poll(session_id: str, poll: _SessionPoll) -> str:
parts.append(f"Exit code: {poll.exit_code}")
else:
parts.append(f"Process running. session_id: {session_id}")
parts.append(f"Elapsed: {poll.elapsed_s:.1f}s")
return "\n".join(parts) if parts else "(no output yet)"
@ -407,3 +448,68 @@ class WriteStdinTool(Tool):
return f"Error: exec session not found: {session_id}"
except Exception as exc:
return f"Error writing to exec session: {exc}"
@tool_parameters(tool_parameters_schema())
class ListExecSessionsTool(Tool):
"""List active exec sessions."""
_scopes = {"core", "subagent"}
config_key = "exec"
@classmethod
def config_cls(cls):
from nanobot.agent.tools.shell import ExecToolConfig
return ExecToolConfig
@classmethod
def enabled(cls, ctx: Any) -> bool:
return ctx.config.exec.enable
def __init__(
self,
*,
manager: ExecSessionManager | None = None,
) -> None:
self._manager = manager or DEFAULT_EXEC_SESSION_MANAGER
@classmethod
def create(cls, ctx: Any) -> Tool:
return cls()
@property
def name(self) -> str:
return "list_exec_sessions"
@property
def description(self) -> str:
return (
"List active long-running exec sessions, including session_id, cwd, "
"elapsed time, idle time, remaining timeout, and command preview. "
"Use this to recover a session_id before polling with write_stdin."
)
@property
def read_only(self) -> bool:
return True
async def execute(self, **kwargs: Any) -> str:
try:
sessions = await self._manager.list()
if not sessions:
return "No active exec sessions."
lines = []
for info in sessions:
command = " ".join(info.command.split())
if len(command) > 120:
command = command[:119] + "..."
status = "exited" if info.returncode is not None else "running"
lines.append(
f"{info.session_id} | {status} | elapsed={info.elapsed_s:.1f}s "
f"| idle={info.idle_s:.1f}s | remaining={info.remaining_s:.1f}s "
f"| cwd={info.cwd} | {command}"
)
return "\n".join(lines)
except Exception as exc:
return f"Error listing exec sessions: {exc}"

View File

@ -1,4 +1,4 @@
"""Search tools: grep."""
"""Search tools: file discovery and grep."""
from __future__ import annotations
@ -12,6 +12,7 @@ from typing import Any, Iterable, TypeVar
from nanobot.agent.tools.filesystem import ListDirTool, _FsTool
_DEFAULT_HEAD_LIMIT = 250
_DEFAULT_FILE_HEAD_LIMIT = 200
T = TypeVar("T")
_TYPE_GLOB_MAP = {
"py": ("*.py", "*.pyi"),
@ -88,6 +89,14 @@ def _matches_type(name: str, file_type: str | None) -> bool:
return any(fnmatch.fnmatch(name.lower(), pattern.lower()) for pattern in patterns)
def _matches_query(rel_path: str, query: str | None) -> bool:
if not query:
return True
haystack = rel_path.lower()
terms = [part for part in query.lower().split() if part]
return all(term in haystack for term in terms)
class _SearchTool(_FsTool):
_IGNORE_DIRS = set(ListDirTool._IGNORE_DIRS)
@ -109,6 +118,161 @@ class _SearchTool(_FsTool):
yield current / filename
class FindFilesTool(_SearchTool):
"""Find files by path fragment, glob, or type."""
_scopes = {"core", "subagent"}
@property
def name(self) -> str:
return "find_files"
@property
def description(self) -> str:
return (
"Find files by path fragment, glob, or file type. "
"Use this before read_file when you need to locate files. "
"Returns workspace-relative paths and skips common dependency/build directories."
)
@property
def read_only(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory or file to search in (default '.')",
},
"query": {
"type": "string",
"description": (
"Optional case-insensitive path fragment search. "
"Whitespace-separated terms must all be present."
),
},
"glob": {
"type": "string",
"description": "Optional file filter, e.g. '*.py' or 'tests/**/test_*.py'",
},
"type": {
"type": "string",
"description": "Optional file type shorthand, e.g. 'py', 'ts', 'md', 'json'",
},
"include_dirs": {
"type": "boolean",
"description": "Include matching directories as well as files (default false)",
},
"sort": {
"type": "string",
"enum": ["path", "modified"],
"description": "Sort by path or most recently modified first (default path)",
},
"head_limit": {
"type": "integer",
"description": "Maximum number of paths to return (default 200, 0 for all, max 1000)",
"minimum": 0,
"maximum": 1000,
},
"offset": {
"type": "integer",
"description": "Skip the first N results before applying head_limit",
"minimum": 0,
"maximum": 100000,
},
},
}
def _iter_paths(self, root: Path, *, include_dirs: bool) -> Iterable[Path]:
if root.is_file():
yield root
return
if include_dirs:
yield root
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = sorted(d for d in dirnames if d not in self._IGNORE_DIRS)
current = Path(dirpath)
if include_dirs and current != root:
yield current
for filename in sorted(filenames):
yield current / filename
async def execute(
self,
path: str = ".",
query: str | None = None,
glob: str | None = None,
type: str | None = None,
include_dirs: bool = False,
sort: str = "path",
head_limit: int | None = None,
offset: int = 0,
**kwargs: Any,
) -> str:
try:
target = self._resolve(path or ".")
if not target.exists():
return f"Error: Path not found: {path}"
if not (target.is_dir() or target.is_file()):
return f"Error: Unsupported path: {path}"
if sort not in {"path", "modified"}:
return "Error: sort must be 'path' or 'modified'"
limit = (
_DEFAULT_FILE_HEAD_LIMIT
if head_limit is None
else None if head_limit == 0 else head_limit
)
root = target if target.is_dir() else target.parent
matches: list[tuple[str, float]] = []
for candidate in self._iter_paths(target, include_dirs=include_dirs):
if candidate.is_dir() and not include_dirs:
continue
rel_path = candidate.relative_to(root).as_posix()
display_path = self._display_path(candidate, root)
name = candidate.name
if glob and not _match_glob(rel_path, name, glob):
continue
if candidate.is_file() and not _matches_type(name, type):
continue
if candidate.is_dir() and type:
continue
if not _matches_query(display_path, query):
continue
try:
mtime = candidate.stat().st_mtime
except OSError:
mtime = 0.0
suffix = "/" if candidate.is_dir() else ""
matches.append((display_path + suffix, mtime))
if sort == "modified":
matches.sort(key=lambda item: (-item[1], item[0]))
else:
matches.sort(key=lambda item: item[0])
paths = [item[0] for item in matches]
paged, truncated = _paginate(paths, limit, offset)
if not paged:
return "No files found"
result = "\n".join(paged)
note = _pagination_note(limit, offset, truncated)
if note:
result += "\n\n" + note
return result
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error finding files: {e}"
class GrepTool(_SearchTool):
"""Search file contents using a regex-like pattern."""
_scopes = {"core", "subagent"}

View File

@ -10,7 +10,7 @@ from pathlib import Path
from typing import Any, Awaitable, Callable
TRACKED_FILE_EDIT_TOOLS = frozenset({"write_file", "edit_file"})
TRACKED_FILE_EDIT_TOOLS = frozenset({"write_file", "edit_file", "apply_patch"})
_MAX_SNAPSHOT_BYTES = 2 * 1024 * 1024
_LIVE_EMIT_INTERVAL_S = 0.18
_LIVE_EMIT_LINE_STEP = 24
@ -153,19 +153,108 @@ def prepare_file_edit_tracker(
workspace: Path | None,
params: dict[str, Any] | None,
) -> FileEditTracker | None:
trackers = prepare_file_edit_trackers(
call_id=call_id,
tool_name=tool_name,
tool=tool,
workspace=workspace,
params=params,
)
return trackers[0] if trackers else None
def prepare_file_edit_trackers(
*,
call_id: str,
tool_name: str,
tool: Any,
workspace: Path | None,
params: dict[str, Any] | None,
) -> list[FileEditTracker]:
if not is_file_edit_tool(tool_name):
return None
return []
paths = resolve_file_edit_paths(tool_name, tool, workspace, params)
trackers: list[FileEditTracker] = []
seen: set[Path] = set()
for path in paths:
try:
resolved = path.resolve()
except Exception:
resolved = path
if resolved in seen:
continue
seen.add(resolved)
before = read_file_snapshot(path)
trackers.append(FileEditTracker(
call_id=str(call_id or ""),
tool=tool_name,
path=path,
display_path=display_file_edit_path(path, workspace),
before=before,
))
return trackers
def resolve_file_edit_paths(
tool_name: str,
tool: Any,
workspace: Path | None,
params: dict[str, Any] | None,
) -> list[Path]:
if tool_name == "apply_patch":
return _resolve_apply_patch_paths(tool, workspace, params)
path = resolve_file_edit_path(tool, workspace, params)
if path is None:
return None
before = read_file_snapshot(path)
return FileEditTracker(
call_id=str(call_id or ""),
tool=tool_name,
path=path,
display_path=display_file_edit_path(path, workspace),
before=before,
)
return []
return [path]
def _resolve_apply_patch_paths(
tool: Any,
workspace: Path | None,
params: dict[str, Any] | None,
) -> list[Path]:
if not isinstance(params, dict):
return []
patch = params.get("patch")
if not isinstance(patch, str) or not patch.strip():
return []
try:
from nanobot.agent.tools.apply_patch import _parse_patch
ops = _parse_patch(patch)
except Exception:
return []
resolved: list[Path] = []
for op in ops:
for raw_path in (op.path, op.new_path):
if not raw_path:
continue
path = _resolve_raw_file_edit_path(tool, workspace, raw_path)
if path is not None:
resolved.append(path)
return resolved
def _resolve_raw_file_edit_path(
tool: Any,
workspace: Path | None,
raw_path: str,
) -> Path | None:
resolver = getattr(tool, "_resolve", None)
if callable(resolver):
try:
resolved = resolver(raw_path)
if isinstance(resolved, Path):
return resolved
if resolved:
return Path(resolved)
except Exception:
return None
if workspace is None:
return Path(raw_path).expanduser().resolve()
return (workspace / raw_path).expanduser().resolve()
def build_file_edit_start_event(

View File

@ -11,8 +11,10 @@ _TOOL_FORMATS: dict[str, tuple[list[str], str, bool, bool]] = {
"read_file": (["path", "file_path"], "read {}", True, False),
"write_file": (["path", "file_path"], "write {}", True, False),
"edit": (["file_path", "path"], "edit {}", True, False),
"find_files": (["query", "glob", "path"], "find {}", False, False),
"grep": (["pattern"], 'grep "{}"', False, False),
"exec": (["command"], "$ {}", False, True),
"list_exec_sessions": ([], "exec sessions", False, False),
"web_search": (["query"], 'search "{}"', False, False),
"web_fetch": (["url"], "fetch {}", True, False),
"list_dir": (["path"], "ls {}", True, False),
@ -81,6 +83,8 @@ def _extract_arg(tc, key_args: list[str]) -> str | None:
def _fmt_known(tc, fmt: tuple, max_length: int = 40) -> str:
"""Format a registered tool using its template."""
if not fmt[0] and "{}" not in fmt[1]:
return fmt[1]
val = _extract_arg(tc, fmt[0])
if val is None:
return tc.name

View File

@ -7,7 +7,7 @@ import subprocess
import sys
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.exec_session import ExecSessionManager, WriteStdinTool
from nanobot.agent.tools.exec_session import ExecSessionManager, ListExecSessionsTool, WriteStdinTool
def _python_command(code: str) -> str:
@ -140,8 +140,10 @@ def test_exec_can_continue_with_stdin(tmp_path):
initial, result = asyncio.run(run())
assert "ready" in initial
assert "Process running" in initial
assert "Elapsed:" in initial
assert "got:ping" in result
assert "Exit code: 0" in result
assert "Elapsed:" in result
def test_write_stdin_can_close_stdin(tmp_path):
@ -220,6 +222,29 @@ def test_write_stdin_accepts_max_output_tokens_alias(tmp_path):
assert "Session terminated." in cleanup
def test_write_stdin_preserves_completed_session_output_until_polled(tmp_path):
async def run() -> tuple[str, str]:
manager = ExecSessionManager()
exec_tool = ExecTool(working_dir=str(tmp_path), timeout=5, session_manager=manager)
stdin_tool = WriteStdinTool(manager=manager)
command = _python_command(
"import time; print('ready', flush=True); "
"time.sleep(1.0); print('done', flush=True)"
)
initial = await exec_tool.execute(command=command, yield_time_ms=300)
sid = _session_id(initial)
await asyncio.sleep(1.2)
final = await stdin_tool.execute(session_id=sid, chars="", yield_time_ms=0)
return initial, final
initial, final = asyncio.run(run())
assert "ready" in initial
assert "done" in final
assert "Exit code: 0" in final
def test_exec_session_mode_reuses_exec_safety_guard(tmp_path):
manager = ExecSessionManager()
tool = ExecTool(
@ -240,3 +265,35 @@ def test_write_stdin_reports_missing_session(tmp_path):
result = asyncio.run(tool.execute(session_id="missing", chars=""))
assert "exec session not found" in result
def test_list_exec_sessions_reports_running_commands(tmp_path):
async def run() -> tuple[str, str, str]:
manager = ExecSessionManager()
exec_tool = ExecTool(working_dir=str(tmp_path), timeout=5, session_manager=manager)
list_tool = ListExecSessionsTool(manager=manager)
stdin_tool = WriteStdinTool(manager=manager)
command = _python_command(
"import time; print('ready', flush=True); time.sleep(5)"
)
initial = await exec_tool.execute(command=command, yield_time_ms=500)
sid = _session_id(initial)
listing = await list_tool.execute()
cleanup = await stdin_tool.execute(session_id=sid, terminate=True, yield_time_ms=0)
return sid, listing, cleanup
sid, listing, cleanup = asyncio.run(run())
assert sid in listing
assert "running" in listing
assert "elapsed=" in listing
assert "remaining=" in listing
assert str(tmp_path) in listing
assert "Session terminated." in cleanup
def test_list_exec_sessions_reports_empty_state():
result = asyncio.run(ListExecSessionsTool(manager=ExecSessionManager()).execute())
assert result == "No active exec sessions."

View File

@ -12,7 +12,7 @@ import pytest
from nanobot.agent.loop import AgentLoop
from nanobot.agent.subagent import SubagentManager, SubagentStatus
from nanobot.agent.tools.search import GrepTool
from nanobot.agent.tools.search import FindFilesTool, GrepTool
from nanobot.agent.tools.web import WebSearchTool
from nanobot.bus.queue import MessageBus
from nanobot.config.schema import WebSearchConfig
@ -33,6 +33,68 @@ async def test_web_search_tool_refreshes_dynamic_config_loader(monkeypatch) -> N
assert await tool.execute("nanobot") == "duckduckgo:nanobot:3"
@pytest.mark.asyncio
async def test_find_files_filters_by_query_glob_and_type(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
(tmp_path / "src" / "settings_view.tsx").write_text("export {}\n", encoding="utf-8")
(tmp_path / "src" / "settings_api.py").write_text("pass\n", encoding="utf-8")
(tmp_path / "README.md").write_text("settings\n", encoding="utf-8")
tool = FindFilesTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
path=".",
query="settings",
glob="src/**",
type="ts",
)
assert result.splitlines() == ["src/settings_view.tsx"]
@pytest.mark.asyncio
async def test_find_files_can_include_directories(tmp_path: Path) -> None:
(tmp_path / "src" / "settings").mkdir(parents=True)
(tmp_path / "src" / "settings" / "index.ts").write_text("export {}\n", encoding="utf-8")
tool = FindFilesTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(path="src", query="settings", include_dirs=True)
assert "src/settings/" in result.splitlines()
assert "src/settings/index.ts" in result.splitlines()
@pytest.mark.asyncio
async def test_find_files_supports_modified_sort_and_pagination(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
for idx, name in enumerate(("a.py", "b.py", "c.py"), start=1):
file_path = tmp_path / "src" / name
file_path.write_text("pass\n", encoding="utf-8")
os.utime(file_path, (idx, idx))
tool = FindFilesTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(
path="src",
type="py",
sort="modified",
head_limit=1,
offset=1,
)
assert result.splitlines()[0] == "src/b.py"
assert "pagination: limit=1, offset=1" in result
@pytest.mark.asyncio
async def test_find_files_rejects_paths_outside_workspace(tmp_path: Path) -> None:
outside = tmp_path.parent / "outside-find-files.txt"
outside.write_text("secret\n", encoding="utf-8")
tool = FindFilesTool(workspace=tmp_path, allowed_dir=tmp_path)
result = await tool.execute(path=str(outside))
assert result.startswith("Error:")
@pytest.mark.asyncio
async def test_grep_respects_glob_filter_and_context(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
@ -249,6 +311,7 @@ def test_agent_loop_registers_grep(tmp_path: Path) -> None:
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
assert "find_files" in loop.tools.tool_names
assert "grep" in loop.tools.tool_names
@ -280,6 +343,7 @@ async def test_subagent_registers_grep(tmp_path: Path) -> None:
status = SubagentStatus(task_id="sub-1", label="label", task_description="search task", started_at=time.monotonic())
await mgr._run_subagent("sub-1", "search task", "label", {"channel": "cli", "chat_id": "direct"}, status)
assert "find_files" in captured["tool_names"]
assert "grep" in captured["tool_names"]

View File

@ -408,7 +408,8 @@ def test_loader_registers_same_tools_as_old_hardcoded():
expected = {
"read_file", "write_file", "edit_file", "list_dir",
"grep", "exec", "web_search", "web_fetch",
"find_files", "grep", "exec", "write_stdin", "list_exec_sessions",
"web_search", "web_fetch",
"message", "spawn", "cron",
}
actual = set(registered)

View File

@ -9,6 +9,7 @@ from nanobot.utils.file_edit_events import (
build_file_edit_start_event,
line_diff_stats,
prepare_file_edit_tracker,
prepare_file_edit_trackers,
read_file_snapshot,
StreamingFileEditTracker,
)
@ -81,6 +82,49 @@ def test_binary_file_is_reported_but_not_counted(tmp_path: Path) -> None:
assert (event["added"], event["deleted"]) == (0, 0)
def test_apply_patch_prepares_trackers_for_each_touched_file(tmp_path: Path) -> None:
(tmp_path / "src").mkdir()
existing = tmp_path / "src" / "existing.py"
existing.write_text("old\nkeep\n", encoding="utf-8")
delete_me = tmp_path / "src" / "delete_me.py"
delete_me.write_text("gone\n", encoding="utf-8")
patch = """*** Begin Patch
*** Add File: src/new.py
+fresh
*** Update File: src/existing.py
@@
-old
+new
keep
*** Delete File: src/delete_me.py
*** End Patch"""
trackers = prepare_file_edit_trackers(
call_id="call-patch",
tool_name="apply_patch",
tool=None,
workspace=tmp_path,
params={"patch": patch},
)
assert [tracker.display_path for tracker in trackers] == [
"src/new.py",
"src/existing.py",
"src/delete_me.py",
]
(tmp_path / "src" / "new.py").write_text("fresh\n", encoding="utf-8")
existing.write_text("new\nkeep\n", encoding="utf-8")
delete_me.unlink()
events = [build_file_edit_end_event(tracker, {"patch": patch}) for tracker in trackers]
by_path = {event["path"]: event for event in events}
assert (by_path["src/new.py"]["added"], by_path["src/new.py"]["deleted"]) == (1, 0)
assert (by_path["src/existing.py"]["added"], by_path["src/existing.py"]["deleted"]) == (1, 1)
assert (by_path["src/delete_me.py"]["added"], by_path["src/delete_me.py"]["deleted"]) == (0, 1)
def test_oversized_write_file_end_uses_known_content_for_exact_count(tmp_path: Path) -> None:
target = tmp_path / "large.txt"
params = {"path": "large.txt", "content": "x" * (2 * 1024 * 1024 + 1)}