diff --git a/nanobot/agent/tools/apply_patch.py b/nanobot/agent/tools/apply_patch.py index c4dbf9f9f..c6a9b2883 100644 --- a/nanobot/agent/tools/apply_patch.py +++ b/nanobot/agent/tools/apply_patch.py @@ -1,4 +1,4 @@ -"""Structured patch editing tool for coding workflows.""" +"""Apply file edits by providing structured edit instructions.""" from __future__ import annotations @@ -6,29 +6,17 @@ import difflib import re from dataclasses import dataclass from pathlib import Path -from typing import Any, Literal +from typing import Any from nanobot.agent.tools.base import tool_parameters from nanobot.agent.tools.filesystem import _FsTool -from nanobot.agent.tools.schema import BooleanSchema, StringSchema, tool_parameters_schema - - -PatchKind = Literal["add", "delete", "update"] - - -@dataclass(slots=True) -class _Hunk: - header: str | None - lines: list[tuple[str, str]] - - -@dataclass(slots=True) -class _PatchOp: - kind: PatchKind - path: str - new_path: str | None = None - add_lines: list[str] | None = None - hunks: list[_Hunk] | None = None +from nanobot.agent.tools.schema import ( + ArraySchema, + BooleanSchema, + ObjectSchema, + StringSchema, + tool_parameters_schema, +) @dataclass(slots=True) @@ -37,22 +25,13 @@ class _PatchSummary: path: str added: int = 0 deleted: int = 0 - new_path: str | None = None class _PatchError(ValueError): pass -_ABSOLUTE_WINDOWS_RE = re.compile(r"^[A-Za-z]:[\\/]") - - -def _is_file_header(line: str) -> bool: - return ( - line.startswith("*** Add File: ") - or line.startswith("*** Delete File: ") - or line.startswith("*** Update File: ") - ) +_ABSOLUTE_WINDOWS_RE = re.compile(r"^[A-Za-z]:[\/]") def _validate_relative_path(path: str) -> str: @@ -63,7 +42,7 @@ def _validate_relative_path(path: str) -> str: raise _PatchError(f"patch path contains a null byte: {path!r}") if normalized.startswith(("~", "/", "\\")) or _ABSOLUTE_WINDOWS_RE.match(normalized): raise _PatchError(f"patch path must be relative: {path}") - if any(part == ".." for part in re.split(r"[\\/]+", normalized)): + if any(part == ".." for part in re.split(r"[\/]+", normalized)): raise _PatchError(f"patch path must not contain '..': {path}") return normalized @@ -97,198 +76,43 @@ def _line_diff_stats(before: str, after: str) -> tuple[int, int]: def _format_summary(summary: _PatchSummary) -> str: - path = ( - f"{summary.path} -> {summary.new_path}" - if summary.new_path - else summary.path - ) stats = "" if summary.added or summary.deleted: stats = f" (+{summary.added}/-{summary.deleted})" - return f"- {summary.action} {path}{stats}" - - -def _parse_patch(patch: str) -> list[_PatchOp]: - lines = patch.replace("\r\n", "\n").replace("\r", "\n").split("\n") - if lines and lines[-1] == "": - lines.pop() - if not lines or lines[0] != "*** Begin Patch": - raise _PatchError("patch must start with '*** Begin Patch'") - if len(lines) < 2 or lines[-1] != "*** End Patch": - raise _PatchError("patch must end with '*** End Patch'") - - ops: list[_PatchOp] = [] - i = 1 - end = len(lines) - 1 - while i < end: - line = lines[i] - if line.startswith("*** Add File: "): - path = _validate_relative_path(line.removeprefix("*** Add File: ")) - i += 1 - add_lines: list[str] = [] - while i < end and not _is_file_header(lines[i]): - if not lines[i].startswith("+"): - raise _PatchError(f"Add File lines must start with '+': {lines[i]!r}") - add_lines.append(lines[i][1:]) - i += 1 - ops.append(_PatchOp(kind="add", path=path, add_lines=add_lines)) - continue - - if line.startswith("*** Delete File: "): - path = _validate_relative_path(line.removeprefix("*** Delete File: ")) - ops.append(_PatchOp(kind="delete", path=path)) - i += 1 - continue - - if line.startswith("*** Update File: "): - path = _validate_relative_path(line.removeprefix("*** Update File: ")) - i += 1 - new_path: str | None = None - if i < end and lines[i].startswith("*** Move to: "): - new_path = _validate_relative_path(lines[i].removeprefix("*** Move to: ")) - i += 1 - - hunks: list[_Hunk] = [] - while i < end and not _is_file_header(lines[i]): - if not lines[i].startswith("@@"): - raise _PatchError(f"Update File sections require '@@' hunks: {lines[i]!r}") - header = lines[i][2:].strip() or None - i += 1 - hunk_lines: list[tuple[str, str]] = [] - while i < end and not lines[i].startswith("@@") and not _is_file_header(lines[i]): - if lines[i] == "*** End of File": - i += 1 - break - if lines[i] == r"\ No newline at end of file": - i += 1 - continue - if not lines[i] or lines[i][0] not in {" ", "+", "-"}: - raise _PatchError(f"Hunk lines must start with ' ', '+', or '-': {lines[i]!r}") - hunk_lines.append((lines[i][0], lines[i][1:])) - i += 1 - if not hunk_lines: - raise _PatchError(f"Update File hunk is empty: {path}") - hunks.append(_Hunk(header=header, lines=hunk_lines)) - if not hunks and new_path is None: - raise _PatchError(f"Update File requires at least one hunk or Move to: {path}") - ops.append(_PatchOp(kind="update", path=path, new_path=new_path, hunks=hunks)) - continue - - raise _PatchError(f"unknown patch header: {line!r}") - - if not ops: - raise _PatchError("patch contains no file operations") - return ops - - -def _find_with_eof_fallback(content: str, needle: str, start: int) -> tuple[int, int]: - pos = content.find(needle, start) - if pos >= 0: - return pos, len(needle) - if needle.endswith("\n"): - trimmed = needle[:-1] - pos = content.find(trimmed, start) - if pos >= 0 and pos + len(trimmed) == len(content): - return pos, len(trimmed) - return -1, 0 - - -def _line_offset(content: str, line_number: int) -> int: - if line_number <= 1: - return 0 - offset = 0 - for current, line in enumerate(content.splitlines(keepends=True), start=1): - if current >= line_number: - return offset - offset += len(line) - return len(content) - - -def _line_hint(header: str | None) -> int | None: - if not header: - return None - match = re.search(r"-(\d+)(?:,\d+)?", header) - return int(match.group(1)) if match else None - - -def _hunk_mismatch(path: str, old_text: str, content: str, header: str | None) -> str: - lines = content.splitlines(keepends=True) - old_lines = old_text.splitlines(keepends=True) - window = max(1, len(old_lines)) - best_ratio, best_start = -1.0, 0 - best_lines: list[str] = [] - for i in range(max(1, len(lines) - window + 1)): - current = lines[i : i + window] - ratio = difflib.SequenceMatcher(None, "".join(old_lines), "".join(current)).ratio() - if ratio > best_ratio: - best_ratio, best_start, best_lines = ratio, i, current - - label = f" after header {header!r}" if header else "" - if best_ratio <= 0: - return f"hunk does not match {path}{label}" - diff = "\n".join(difflib.unified_diff( - old_lines, - best_lines, - fromfile="patch hunk", - tofile=f"{path} (actual, line {best_start + 1})", - lineterm="", - )) - return ( - f"hunk does not match {path}{label}. " - f"Best match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}" - ) - - -def _apply_hunks(path: str, content: str, hunks: list[_Hunk]) -> str: - cursor = 0 - for hunk in hunks: - old_lines = [text for marker, text in hunk.lines if marker in {" ", "-"}] - new_lines = [text for marker, text in hunk.lines if marker in {" ", "+"}] - old_text = _lines_to_text(old_lines) - new_text = _lines_to_text(new_lines) - - search_start = cursor - line_hint = None - if hunk.header: - line_hint = _line_hint(hunk.header) - if line_hint is not None: - search_start = _line_offset(content, line_hint) - else: - header_pos = content.find(hunk.header, cursor) - if header_pos >= 0: - search_start = header_pos - - if old_text: - pos, match_len = _find_with_eof_fallback(content, old_text, search_start) - if pos < 0 and search_start != 0 and line_hint is None: - pos, match_len = _find_with_eof_fallback(content, old_text, 0) - if pos < 0: - raise _PatchError(_hunk_mismatch(path, old_text, content, hunk.header)) - else: - pos = search_start - match_len = 0 - - content = content[:pos] + new_text + content[pos + match_len:] - cursor = pos + len(new_text) - return content + return f"- {summary.action} {summary.path}{stats}" @tool_parameters( tool_parameters_schema( - patch=StringSchema( - "Full patch text. Use *** Begin Patch / *** End Patch and file sections " - "for Add File, Update File, Delete File, and optional Move to.", - min_length=1, + edits=ArraySchema( + items=ObjectSchema( + path=StringSchema("Relative path to the file to edit."), + action=StringSchema( + "Operation type: replace (find and replace text), add (append new content or create file), delete (remove text).", + enum=["replace", "add", "delete"], + ), + old_text=StringSchema( + "Exact text to search for in the file. Required for replace and delete.", + nullable=True, + ), + new_text=StringSchema( + "Text to replace with or append. Required for replace and add.", + nullable=True, + ), + required=["path", "action"], + ), + description="List of edits to apply. Each edit specifies a file and the change to make.", + min_items=1, + max_items=20, ), dry_run=BooleanSchema( description="Validate and summarize the patch without writing files.", default=False, ), - required=["patch"], ) ) class ApplyPatchTool(_FsTool): - """Apply a structured multi-file patch.""" + """Apply file edits by providing structured edit instructions.""" _scopes = {"core", "subagent"} @property @@ -298,102 +122,190 @@ class ApplyPatchTool(_FsTool): @property def description(self) -> str: return ( - "Default tool for code edits. Apply a structured patch with " - "*** Begin Patch and *** End Patch. Supports Add File, Update File, " - "Delete File, and Move to across one or more files. Use this for " - "multi-file changes, structural edits, generated code, or any edit " - "where a reviewable patch is clearer than an exact replacement. " - "Paths must be relative. Set dry_run=true to validate and preview " - "the change summary without writing files. Use edit_file only for " - "small exact replacements copied from read_file." + "Default tool for code edits. Supports multi-file changes in a single call. " + "Provide a list of structured edits, each specifying a file path, action (replace/add/delete), and the text to change. " + "Paths must be relative. Set dry_run=true to validate and preview without writing files. " + "Use edit_file only for small exact replacements on a single file." ) - async def execute(self, patch: str, dry_run: bool = False, **kwargs: Any) -> str: + async def execute( + self, + edits: list[dict] | None = None, + dry_run: bool = False, + **kwargs: Any, + ) -> str: try: - ops = _parse_patch(patch) + if not edits: + raise _PatchError("must provide edits") + writes: dict[Path, str] = {} deletes: set[Path] = set() summaries: list[_PatchSummary] = [] - for op in ops: - source = self._resolve(op.path) - if op.kind == "add": - if source.exists() or source in writes: - raise _PatchError(f"file to add already exists: {op.path}") - new_content = _lines_to_text(op.add_lines or []) - writes[source] = new_content - deletes.discard(source) - summaries.append(_PatchSummary( - action="add", - path=op.path, - added=_text_line_count(new_content), - )) - continue + for edit in edits: + path = _validate_relative_path(edit["path"]) + action = edit["action"] + source = self._resolve(path) - if op.kind == "delete": - pending_content = writes.get(source) - if pending_content is None and not source.exists(): - raise _PatchError(f"file to delete does not exist: {op.path}") - if pending_content is None and not source.is_file(): - raise _PatchError(f"path to delete is not a file: {op.path}") - deleted_lines = 0 - if pending_content is not None: - deleted_lines = _text_line_count(pending_content) - else: + if action == "add": + new_text = edit.get("new_text") + if new_text is None: + raise _PatchError(f"new_text required for add: {path}") + + pending = writes.get(source) + if pending is not None: + content = pending + exists = True + elif source.exists(): raw = source.read_bytes() try: - deleted_lines = _text_line_count(raw.decode("utf-8")) + content = raw.decode("utf-8") except UnicodeDecodeError: - deleted_lines = 0 - deletes.add(source) - writes.pop(source, None) - summaries.append(_PatchSummary( - action="delete", - path=op.path, - deleted=deleted_lines, - )) - continue + raise _PatchError(f"file is not UTF-8 text: {path}") + exists = True + else: + content = "" + exists = False + + if exists: + uses_crlf = "\r\n" in content + new_norm = content.replace("\r\n", "\n") + new_text.replace("\r\n", "\n") + if new_norm and not new_norm.endswith("\n"): + new_norm += "\n" + if uses_crlf: + new_norm = new_norm.replace("\n", "\r\n") + writes[source] = new_norm + deletes.discard(source) + added, deleted = _line_diff_stats(content, new_norm) + action_name = "update" + else: + new_norm = _lines_to_text(new_text.splitlines()) + writes[source] = new_norm + deletes.discard(source) + added = _text_line_count(new_norm) + deleted = 0 + action_name = "add" + + summaries.append( + _PatchSummary( + action=action_name, path=path, added=added, deleted=deleted + ) + ) + + elif action == "replace": + old_text = edit.get("old_text") or "" + if not old_text: + raise _PatchError(f"old_text required for replace: {path}") + new_text = edit.get("new_text") + if new_text is None: + raise _PatchError(f"new_text required for replace: {path}") + + pending = writes.get(source) + if pending is not None: + content = pending + elif source.exists(): + raw = source.read_bytes() + try: + content = raw.decode("utf-8") + except UnicodeDecodeError: + raise _PatchError(f"file is not UTF-8 text: {path}") + else: + raise _PatchError(f"file to update does not exist: {path}") + + if pending is None and not source.is_file(): + raise _PatchError(f"path to update is not a file: {path}") + + uses_crlf = "\r\n" in content + norm_content = content.replace("\r\n", "\n") + norm_old = old_text.replace("\r\n", "\n") + + pos = norm_content.find(norm_old) + if pos < 0: + raise _PatchError(f"old_text not found in {path}") + if norm_content.find(norm_old, pos + 1) >= 0: + raise _PatchError(f"old_text appears multiple times in {path}") + + new_norm = ( + norm_content[:pos] + + new_text.replace("\r\n", "\n") + + norm_content[pos + len(norm_old) :] + ) + if new_norm and not new_norm.endswith("\n"): + new_norm += "\n" + if uses_crlf: + new_norm = new_norm.replace("\n", "\r\n") + + writes[source] = new_norm + deletes.discard(source) + added, deleted = _line_diff_stats(content, new_norm) + summaries.append( + _PatchSummary( + action="update", path=path, added=added, deleted=deleted + ) + ) + + elif action == "delete": + old_text = edit.get("old_text") or "" + if not old_text: + raise _PatchError(f"old_text required for delete: {path}") + + pending = writes.get(source) + if pending is not None: + content = pending + elif source.exists(): + raw = source.read_bytes() + try: + content = raw.decode("utf-8") + except UnicodeDecodeError: + raise _PatchError(f"file is not UTF-8 text: {path}") + else: + raise _PatchError(f"file to update does not exist: {path}") + + if pending is None and not source.is_file(): + raise _PatchError(f"path to update is not a file: {path}") + + uses_crlf = "\r\n" in content + norm_content = content.replace("\r\n", "\n") + norm_old = old_text.replace("\r\n", "\n") + + pos = norm_content.find(norm_old) + if pos < 0: + raise _PatchError(f"old_text not found in {path}") + if norm_content.find(norm_old, pos + 1) >= 0: + raise _PatchError(f"old_text appears multiple times in {path}") + + if norm_old.strip() == norm_content.strip(): + deletes.add(source) + writes.pop(source, None) + added, deleted = 0, _text_line_count(content) + summaries.append( + _PatchSummary( + action="delete", path=path, added=added, deleted=deleted + ) + ) + else: + new_norm = ( + norm_content[:pos] + norm_content[pos + len(norm_old) :] + ) + if new_norm and not new_norm.endswith("\n"): + new_norm += "\n" + if uses_crlf: + new_norm = new_norm.replace("\n", "\r\n") + writes[source] = new_norm + deletes.discard(source) + added, deleted = _line_diff_stats(content, new_norm) + summaries.append( + _PatchSummary( + action="update", path=path, added=added, deleted=deleted + ) + ) - pending_content = writes.get(source) - if pending_content is None and not source.exists(): - raise _PatchError(f"file to update does not exist: {op.path}") - if pending_content is None and not source.is_file(): - raise _PatchError(f"path to update is not a file: {op.path}") - if pending_content is not None: - content = pending_content else: - raw = source.read_bytes() - try: - content = raw.decode("utf-8") - except UnicodeDecodeError as exc: - raise _PatchError(f"file to update is not UTF-8 text: {op.path}") from exc - uses_crlf = "\r\n" in content - content = content.replace("\r\n", "\n") - new_content = _apply_hunks(op.path, content, op.hunks or []) - added, deleted = _line_diff_stats(content, new_content) - if uses_crlf: - new_content = new_content.replace("\n", "\r\n") - - target = self._resolve(op.new_path) if op.new_path else source - if op.new_path and (target.exists() or target in writes) and target != source: - raise _PatchError(f"move target already exists: {op.new_path}") - writes[target] = new_content - deletes.discard(target) - if target != source: - deletes.add(source) - writes.pop(source, None) - summaries.append(_PatchSummary( - action="move" if op.new_path else "update", - path=op.path, - new_path=op.new_path, - added=added, - deleted=deleted, - )) + raise _PatchError(f"unknown action: {action}") if dry_run: - return ( - "Patch dry-run succeeded:\n" - + "\n".join(_format_summary(summary) for summary in summaries) + return "Patch dry-run succeeded:\n" + "\n".join( + _format_summary(summary) for summary in summaries ) backups: dict[Path, bytes | None] = {} @@ -419,9 +331,8 @@ class ApplyPatchTool(_FsTool): for path in set(writes) | deletes: self._file_states.record_write(path) - return ( - "Patch applied:\n" - + "\n".join(_format_summary(summary) for summary in summaries) + return "Patch applied:\n" + "\n".join( + _format_summary(summary) for summary in summaries ) except PermissionError as exc: return f"Error: {exc}" diff --git a/nanobot/utils/file_edit_events.py b/nanobot/utils/file_edit_events.py index fc561bcb0..fd929134d 100644 --- a/nanobot/utils/file_edit_events.py +++ b/nanobot/utils/file_edit_events.py @@ -215,26 +215,24 @@ def _resolve_apply_patch_paths( ) -> list[Path]: if not isinstance(params, dict): return [] - patch = params.get("patch") - if not isinstance(patch, str) or not patch.strip(): + edits = params.get("edits") + if not isinstance(edits, list) or not edits: return [] if params.get("dry_run") is True: return [] - try: - from nanobot.agent.tools.apply_patch import _parse_patch - - ops = _parse_patch(patch) - except Exception: - return [] resolved: list[Path] = [] - for op in ops: - for raw_path in (op.path, op.new_path): - if not raw_path: - continue - path = _resolve_raw_file_edit_path(tool, workspace, raw_path) - if path is not None: - resolved.append(path) + seen: set[Path] = set() + for edit in edits: + if not isinstance(edit, dict): + continue + raw_path = edit.get("path") + if not isinstance(raw_path, str) or not raw_path.strip(): + continue + path = _resolve_raw_file_edit_path(tool, workspace, raw_path) + if path is not None and path not in seen: + seen.add(path) + resolved.append(path) return resolved @@ -438,16 +436,34 @@ class StreamingFileEditTracker: async def _update_apply_patch(self, state: _StreamingFileEditState) -> None: if _json_bool_true(state.arguments, "dry_run"): return - patch = _extract_json_string_prefix(state.arguments, "patch") - if not patch: - return tool = self._tools.get("apply_patch") if hasattr(self._tools, "get") else None events: list[dict[str, Any]] = [] now = time.monotonic() - for raw_path, added, deleted, delete_file in _streaming_apply_patch_stats(patch): + + path_matches = list(re.finditer(r'"path"\s*:\s*"([^"]+)"', state.arguments)) + if not path_matches: + return + + for i, m in enumerate(path_matches): + raw_path = m.group(1) path = _resolve_raw_file_edit_path(tool, self._workspace, raw_path) if path is None: continue + + segment_start = m.start() + segment_end = path_matches[i + 1].start() if i + 1 < len(path_matches) else len(state.arguments) + segment = state.arguments[segment_start:segment_end] + + action_match = re.search(r'"action"\s*:\s*"(replace|add|delete)"', segment) + action = action_match.group(1) if action_match else "replace" + + old_text = _extract_json_string_prefix(segment, "old_text") or "" + new_text = _extract_json_string_prefix(segment, "new_text") or "" + + added = _text_line_count(new_text) if action in ("replace", "add") else 0 + deleted = _text_line_count(old_text) if action in ("replace", "delete") else 0 + delete_file = action == "delete" + file_state = state.patch_files.get(raw_path) if file_state is None: tracker = FileEditTracker( @@ -779,9 +795,10 @@ class _StreamingFileEditState: arguments = getattr(tool_call, "arguments", None) if not isinstance(arguments, dict): return False - patch = arguments.get("patch") - streamed_patch = _extract_complete_json_string(self.arguments, "patch") - return isinstance(patch, str) and streamed_patch == patch + edits = arguments.get("edits") + if not isinstance(edits, list): + return False + return '"edits"' in self.arguments arguments = getattr(tool_call, "arguments", None) if not isinstance(arguments, dict): return False @@ -849,65 +866,6 @@ def _extract_json_string_prefix(source: str, key: str) -> str | None: return "".join(out) -def _streaming_apply_patch_stats(patch: str) -> list[tuple[str, int, int, bool]]: - stats: dict[str, list[Any]] = {} - order: list[str] = [] - current: str | None = None - - def ensure(path: str, *, delete_file: bool = False) -> list[Any]: - if path not in stats: - stats[path] = [0, 0, False] - order.append(path) - if delete_file: - stats[path][2] = True - return stats[path] - - lines = patch.splitlines() - tail = "" - if patch and not patch.endswith(("\n", "\r")) and lines: - tail = lines.pop() - - for line in lines: - if line.startswith("*** Add File: "): - current = line[len("*** Add File: "):].strip() - if current: - ensure(current) - continue - if line.startswith("*** Update File: "): - current = line[len("*** Update File: "):].strip() - if current: - ensure(current) - continue - if line.startswith("*** Delete File: "): - current = line[len("*** Delete File: "):].strip() - if current: - ensure(current, delete_file=True) - continue - if line.startswith("*** Move to: "): - moved = line[len("*** Move to: "):].strip() - if moved: - current = moved - ensure(current) - continue - if line.startswith("*** "): - current = None - continue - if not current: - continue - if line.startswith("+") and not line.startswith("+++"): - ensure(current)[0] += 1 - elif line.startswith("-") and not line.startswith("---"): - ensure(current)[1] += 1 - - if current and tail: - if tail.startswith("+") and not tail.startswith("+++"): - ensure(current)[0] += 1 - elif tail.startswith("-") and not tail.startswith("---"): - ensure(current)[1] += 1 - - return [(path, int(stats[path][0]), int(stats[path][1]), bool(stats[path][2])) for path in order] - - def _extract_complete_json_string(source: str, key: str) -> str | None: match = re.search(rf'"{re.escape(key)}"\s*:\s*"', source) if match is None: diff --git a/tests/tools/test_apply_patch_tool.py b/tests/tools/test_apply_patch_tool.py index a356b83e8..090dafb2d 100644 --- a/tests/tools/test_apply_patch_tool.py +++ b/tests/tools/test_apply_patch_tool.py @@ -5,283 +5,249 @@ import asyncio from nanobot.agent.tools.apply_patch import ApplyPatchTool -def test_apply_patch_adds_file(tmp_path): +def test_apply_patch_edits_replace(tmp_path): + target = tmp_path / "calc.py" + target.write_text("def add(a, b):\n return a + b\n") tool = ApplyPatchTool(workspace=tmp_path) - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Add File: hello.txt -+Hello -+world -*** End Patch -""" - )) + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "calc.py", + "action": "replace", + "old_text": " return a + b", + "new_text": " return a - b", + } + ] + ) + ) - assert "Patch applied" in result - assert (tmp_path / "hello.txt").read_text() == "Hello\nworld\n" + assert "update calc.py" in result + assert target.read_text() == "def add(a, b):\n return a - b\n" -def test_apply_patch_updates_multiple_hunks(tmp_path): - target = tmp_path / "multi.txt" - target.write_text("line1\nline2\nline3\nline4\n") +def test_apply_patch_edits_add_new_file(tmp_path): tool = ApplyPatchTool(workspace=tmp_path) - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: multi.txt -@@ --line2 -+changed2 -@@ --line4 -+changed4 -*** End Patch -""" - )) + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "config.py", + "action": "add", + "new_text": "DEBUG = True", + } + ] + ) + ) - assert "update multi.txt" in result - assert "(+2/-2)" in result - assert target.read_text() == "line1\nchanged2\nline3\nchanged4\n" + assert "add config.py" in result + assert (tmp_path / "config.py").read_text() == "DEBUG = True\n" -def test_apply_patch_dry_run_validates_without_writing(tmp_path): - target = tmp_path / "dry.txt" - target.write_text("before\n") +def test_apply_patch_edits_add_to_existing_file(tmp_path): + target = tmp_path / "log.py" + target.write_text("import logging\n\nlogger = logging.getLogger(__name__)\n") tool = ApplyPatchTool(workspace=tmp_path) - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: dry.txt -@@ --before -+after -*** Add File: added.txt -+new -*** End Patch -""", - dry_run=True, - )) + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "log.py", + "action": "add", + "new_text": "def debug(msg):\n logger.debug(msg)", + } + ] + ) + ) - assert "Patch dry-run succeeded" in result - assert "- update dry.txt (+1/-1)" in result - assert "- add added.txt (+1/-0)" in result - assert target.read_text() == "before\n" - assert not (tmp_path / "added.txt").exists() + assert "update log.py" in result + assert ( + target.read_text() + == "import logging\n\nlogger = logging.getLogger(__name__)\ndef debug(msg):\n logger.debug(msg)\n" + ) -def test_apply_patch_applies_repeated_update_sections_sequentially(tmp_path): - target = tmp_path / "repeat.txt" - target.write_text("one\ntwo\nthree\n") +def test_apply_patch_edits_delete(tmp_path): + target = tmp_path / "utils.py" + target.write_text("def unused():\n pass\ndef used():\n return 1\n") tool = ApplyPatchTool(workspace=tmp_path) - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: repeat.txt -@@ --one -+ONE -*** Update File: repeat.txt -@@ --three -+THREE -*** End Patch -""" - )) + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "utils.py", + "action": "delete", + "old_text": "def unused():\n pass\n", + } + ] + ) + ) - assert result.count("update repeat.txt") == 2 - assert target.read_text() == "ONE\ntwo\nTHREE\n" + assert "update utils.py" in result + assert target.read_text() == "def used():\n return 1\n" -def test_apply_patch_ignores_standard_no_newline_marker(tmp_path): - target = tmp_path / "plain.txt" - target.write_text("before") - tool = ApplyPatchTool(workspace=tmp_path) - - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: plain.txt -@@ -1,1 +1,1 @@ --before -\\ No newline at end of file -+after -\\ No newline at end of file -*** End Patch -""" - )) - - assert "update plain.txt" in result - assert target.read_text() == "after\n" - - -def test_apply_patch_rejects_empty_hunk(tmp_path): - target = tmp_path / "plain.txt" - target.write_text("before\n") - tool = ApplyPatchTool(workspace=tmp_path) - - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: plain.txt -@@ -*** End Patch -""" - )) - - assert "hunk is empty" in result - assert target.read_text() == "before\n" - - -def test_apply_patch_uses_unified_diff_line_hint(tmp_path): - target = tmp_path / "repeated.txt" - target.write_text("target\nmiddle\ntarget\n") - tool = ApplyPatchTool(workspace=tmp_path) - - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: repeated.txt -@@ -3,1 +3,1 @@ --target -+changed -*** End Patch -""" - )) - - assert "update repeated.txt" in result - assert target.read_text() == "target\nmiddle\nchanged\n" - - -def test_apply_patch_line_hint_does_not_fallback_to_earlier_match(tmp_path): - target = tmp_path / "repeated.txt" - target.write_text("target\nmiddle\nother\n") - tool = ApplyPatchTool(workspace=tmp_path) - - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: repeated.txt -@@ -3,1 +3,1 @@ --target -+changed -*** End Patch -""" - )) - - assert "hunk does not match repeated.txt" in result - assert target.read_text() == "target\nmiddle\nother\n" - - -def test_apply_patch_mismatch_reports_best_match(tmp_path): - target = tmp_path / "near.txt" - target.write_text("alpha\nbeta\ngamma\n") - tool = ApplyPatchTool(workspace=tmp_path) - - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: near.txt -@@ -2,1 +2,1 @@ --betx -+delta -*** End Patch -""" - )) - - assert "hunk does not match near.txt" in result - assert "Best match" in result - assert "line 2" in result - assert target.read_text() == "alpha\nbeta\ngamma\n" - - -def test_apply_patch_moves_and_updates_file(tmp_path): - source = tmp_path / "old" / "name.txt" - source.parent.mkdir() - source.write_text("old content\n") - tool = ApplyPatchTool(workspace=tmp_path) - - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: old/name.txt -*** Move to: renamed/dir/name.txt -@@ --old content -+new content -*** End Patch -""" - )) - - assert "move old/name.txt -> renamed/dir/name.txt" in result - assert not source.exists() - assert (tmp_path / "renamed" / "dir" / "name.txt").read_text() == "new content\n" - - -def test_apply_patch_deletes_file(tmp_path): +def test_apply_patch_edits_delete_entire_file(tmp_path): target = tmp_path / "obsolete.txt" target.write_text("remove me\n") tool = ApplyPatchTool(workspace=tmp_path) - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Delete File: obsolete.txt -*** End Patch -""" - )) + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "obsolete.txt", + "action": "delete", + "old_text": "remove me\n", + } + ] + ) + ) assert "delete obsolete.txt" in result assert not target.exists() -def test_apply_patch_rejects_absolute_and_parent_paths(tmp_path): +def test_apply_patch_edits_batch_multiple_files(tmp_path): + a = tmp_path / "a.py" + a.write_text("X = 1\n") + b = tmp_path / "b.py" + b.write_text("from a import X\nprint(X)\n") tool = ApplyPatchTool(workspace=tmp_path) - absolute = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Add File: /tmp/owned.txt -+nope -*** End Patch -""" - )) - parent = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Add File: ../owned.txt -+nope -*** End Patch -""" - )) + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "a.py", + "action": "replace", + "old_text": "X = 1", + "new_text": "Y = 1", + }, + { + "path": "b.py", + "action": "replace", + "old_text": "from a import X", + "new_text": "from a import Y", + }, + ] + ) + ) + + assert "update a.py" in result + assert "update b.py" in result + assert a.read_text() == "Y = 1\n" + assert b.read_text() == "from a import Y\nprint(X)\n" + + +def test_apply_patch_edits_rejects_ambiguous_old_text(tmp_path): + target = tmp_path / "repeated.txt" + target.write_text("target\nmiddle\ntarget\n") + tool = ApplyPatchTool(workspace=tmp_path) + + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "repeated.txt", + "action": "replace", + "old_text": "target", + "new_text": "changed", + } + ] + ) + ) + + assert "old_text appears multiple times" in result + assert target.read_text() == "target\nmiddle\ntarget\n" + + +def test_apply_patch_edits_dry_run_validates_without_writing(tmp_path): + target = tmp_path / "dry.txt" + target.write_text("before\n") + tool = ApplyPatchTool(workspace=tmp_path) + + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "dry.txt", + "action": "replace", + "old_text": "before", + "new_text": "after", + }, + { + "path": "added.txt", + "action": "add", + "new_text": "new", + }, + ], + dry_run=True, + ) + ) + + assert "Patch dry-run succeeded" in result + assert target.read_text() == "before\n" + assert not (tmp_path / "added.txt").exists() + + +def test_apply_patch_edits_rejects_absolute_and_parent_paths(tmp_path): + tool = ApplyPatchTool(workspace=tmp_path) + + absolute = asyncio.run( + tool.execute( + edits=[ + { + "path": "/tmp/owned.txt", + "action": "add", + "new_text": "nope", + } + ] + ) + ) + parent = asyncio.run( + tool.execute( + edits=[ + { + "path": "../owned.txt", + "action": "add", + "new_text": "nope", + } + ] + ) + ) assert "must be relative" in absolute assert "must not contain '..'" in parent assert not (tmp_path.parent / "owned.txt").exists() -def test_apply_patch_does_not_overwrite_existing_file_with_add(tmp_path): - target = tmp_path / "existing.txt" - target.write_text("keep me\n") - tool = ApplyPatchTool(workspace=tmp_path) - - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Add File: existing.txt -+replace me -*** End Patch -""" - )) - - assert "file to add already exists" in result - assert target.read_text() == "keep me\n" - - -def test_apply_patch_rolls_back_when_late_operation_fails(tmp_path): +def test_apply_patch_edits_rolls_back_when_late_operation_fails(tmp_path): first = tmp_path / "first.txt" first.write_text("before\n") tool = ApplyPatchTool(workspace=tmp_path) - result = asyncio.run(tool.execute( - patch="""*** Begin Patch -*** Update File: first.txt -@@ --before -+after -*** Delete File: missing.txt -*** End Patch -""" - )) + result = asyncio.run( + tool.execute( + edits=[ + { + "path": "first.txt", + "action": "replace", + "old_text": "before", + "new_text": "after", + }, + { + "path": "missing.txt", + "action": "delete", + "old_text": "remove me", + }, + ] + ) + ) - assert "file to delete does not exist" in result + assert "file to update does not exist: missing.txt" in result assert first.read_text() == "before\n" diff --git a/tests/utils/test_file_edit_events.py b/tests/utils/test_file_edit_events.py index 57d1272c9..fe035b41b 100644 --- a/tests/utils/test_file_edit_events.py +++ b/tests/utils/test_file_edit_events.py @@ -89,23 +89,18 @@ def test_apply_patch_prepares_trackers_for_each_touched_file(tmp_path: Path) -> delete_me = tmp_path / "src" / "delete_me.py" delete_me.write_text("gone\n", encoding="utf-8") - patch = """*** Begin Patch -*** Add File: src/new.py -+fresh -*** Update File: src/existing.py -@@ --old -+new - keep -*** Delete File: src/delete_me.py -*** End Patch""" + edits = [ + {"path": "src/new.py", "action": "add", "new_text": "fresh"}, + {"path": "src/existing.py", "action": "replace", "old_text": "old", "new_text": "new"}, + {"path": "src/delete_me.py", "action": "delete", "old_text": "gone\n"}, + ] trackers = prepare_file_edit_trackers( call_id="call-patch", tool_name="apply_patch", tool=None, workspace=tmp_path, - params={"patch": patch}, + params={"edits": edits}, ) assert [tracker.display_path for tracker in trackers] == [ @@ -118,7 +113,7 @@ def test_apply_patch_prepares_trackers_for_each_touched_file(tmp_path: Path) -> existing.write_text("new\nkeep\n", encoding="utf-8") delete_me.unlink() - events = [build_file_edit_end_event(tracker, {"patch": patch}) for tracker in trackers] + events = [build_file_edit_end_event(tracker, {"edits": edits}) for tracker in trackers] by_path = {event["path"]: event for event in events} assert (by_path["src/new.py"]["added"], by_path["src/new.py"]["deleted"]) == (1, 0) assert (by_path["src/existing.py"]["added"], by_path["src/existing.py"]["deleted"]) == (1, 1) @@ -135,12 +130,9 @@ def test_apply_patch_dry_run_does_not_prepare_file_edit_trackers(tmp_path: Path) workspace=tmp_path, params={ "dry_run": True, - "patch": """*** Begin Patch -*** Update File: file.txt -@@ --old -+new -*** End Patch""", + "edits": [ + {"path": "file.txt", "action": "replace", "old_text": "old", "new_text": "new"} + ], }, ) @@ -221,14 +213,8 @@ def test_streaming_apply_patch_tracker_emits_live_counts_per_file(tmp_path: Path "call_id": "call-patch", "name": "apply_patch", "arguments_delta": ( - '{"patch":"*** Begin Patch\\n' - '*** Update File: src/existing.py\\n' - '@@\\n' - '-old\\n' - '+new\\n' - ' keep\\n' - '*** Add File: src/new.py\\n' - '+fresh\\n' + '{"edits":[{"path":"src/existing.py","action":"replace","old_text":"old","new_text":"new"}' + ',{"path":"src/new.py","action":"add","new_text":"fresh"}]}' ), }) @@ -255,9 +241,7 @@ def test_streaming_apply_patch_tracker_skips_dry_run(tmp_path: Path) -> None: "call_id": "call-patch", "name": "apply_patch", "arguments_delta": ( - '{"dry_run":true,"patch":"*** Begin Patch\\n' - '*** Add File: dry.md\\n' - '+preview\\n' + '{"dry_run":true,"edits":[{"path":"dry.md","action":"add","new_text":"preview"}]}' ), })